Skip to content

Commit baccb0e

Browse files
committed
Update community caches in parallel
Refactor task for updating chunked community packages to update each community in parallel. Introduce a new task which is spawned by the main update task to fire on each community. Remove function responsible for updating all community caches and introduce a new function doing the same thing but for a single community, taking the community as a parameter. Update cache tests because the old cache update function was removed. Add update_single_community_cache to celery tests. Add a create_batch method to CommunityFactory to help with testing. Add test for the newly updated celery task to assert that the child tasks are indeed spawned per community. Refs. TS-2378
1 parent a4c8e38 commit baccb0e

File tree

6 files changed

+77
-15
lines changed

6 files changed

+77
-15
lines changed

django/thunderstore/community/factories.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ class Meta:
2222
name = factory.Sequence(lambda n: f"TestCommunity{n}")
2323
identifier = factory.Sequence(lambda n: f"test-community-{n}")
2424

25+
@classmethod
26+
def create_batch(cls, size=1, **kwargs):
27+
return [
28+
cls(**{**kwargs, "identifier": f"test-community-{i}"}) for i in range(size)
29+
]
30+
2531

2632
class PackageCategoryFactory(DjangoModelFactory):
2733
class Meta:

django/thunderstore/core/tests/test_celery.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def test_task():
3232
"thunderstore.core.tasks.celery_post",
3333
"thunderstore.cache.tasks.invalidate_cache",
3434
"thunderstore.repository.tasks.update_api_caches",
35+
"thunderstore.repository.tasks.update_single_community_cache",
3536
"thunderstore.usermedia.tasks.celery_cleanup_expired_uploads",
3637
"thunderstore.schema_import.tasks.sync_ecosystem_schema",
3738
"thunderstore.repository.tasks.files.extract_package_version_file_tree",

django/thunderstore/repository/api/v1/tasks.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,9 @@ def update_api_v1_indexes() -> None:
3232
APIV1PackageCache.drop_stale_cache()
3333

3434

35-
def update_api_v1_chunked_package_caches() -> None:
36-
for community in Community.objects.iterator():
37-
try:
38-
APIV1ChunkedPackageCache.update_for_community(community)
39-
except Exception as e: # pragma: no cover
40-
capture_exception(e)
41-
35+
def update_api_v1_chunked_package_cache_for_community(community: Community) -> None:
36+
try:
37+
APIV1ChunkedPackageCache.update_for_community(community)
38+
except Exception as e: # pragma: no cover
39+
capture_exception(e)
4240
APIV1ChunkedPackageCache.drop_stale_cache()

django/thunderstore/repository/api/v1/tests/test_caches.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from thunderstore.community.models import Community, CommunitySite, PackageListing
1616
from thunderstore.repository.api.v1.tasks import (
1717
update_api_v1_caches,
18-
update_api_v1_chunked_package_caches,
18+
update_api_v1_chunked_package_cache_for_community,
1919
)
2020
from thunderstore.repository.models import APIV1ChunkedPackageCache, APIV1PackageCache
2121
from thunderstore.repository.models.cache import get_package_listing_chunk
@@ -155,7 +155,7 @@ def test_api_v1_chunked_package_cache__builds_index_and_chunks(
155155
PackageListingFactory(community_=community)
156156
assert APIV1ChunkedPackageCache.get_latest_for_community(community) is None
157157

158-
update_api_v1_chunked_package_caches()
158+
update_api_v1_chunked_package_cache_for_community(community)
159159
cache = APIV1ChunkedPackageCache.get_latest_for_community(community)
160160
assert cache is not None
161161
assert cache.index.data_url.startswith(settings.AWS_S3_ENDPOINT_URL)
@@ -171,10 +171,12 @@ def test_api_v1_chunked_package_cache__drops_stale_caches() -> None:
171171
"""
172172
Caches are currently only soft deleted.
173173
"""
174-
PackageListingFactory()
174+
listing = PackageListingFactory()
175+
community = listing.community
176+
175177
assert not APIV1ChunkedPackageCache.objects.exists()
176178

177-
update_api_v1_chunked_package_caches()
179+
update_api_v1_chunked_package_cache_for_community(community)
178180
first_cache = APIV1ChunkedPackageCache.objects.get()
179181
assert not first_cache.is_deleted
180182

@@ -183,7 +185,7 @@ def test_api_v1_chunked_package_cache__drops_stale_caches() -> None:
183185
assert not first_cache.is_deleted
184186

185187
# Two caches exists, but neither is beyond the cutoff period.
186-
update_api_v1_chunked_package_caches()
188+
update_api_v1_chunked_package_cache_for_community(community)
187189
APIV1ChunkedPackageCache.drop_stale_cache()
188190
second_cache = APIV1ChunkedPackageCache.get_latest_for_community(
189191
first_cache.community,

django/thunderstore/repository/tasks/caches.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1-
from celery import shared_task # type: ignore
1+
import logging
22

3+
from celery import group, shared_task # type: ignore
4+
5+
from thunderstore.community.models import Community
36
from thunderstore.core.settings import CeleryQueues
47
from thunderstore.repository.api.experimental.views.package_index import (
58
update_api_experimental_package_index,
69
)
710
from thunderstore.repository.api.v1.tasks import (
811
update_api_v1_caches,
9-
update_api_v1_chunked_package_caches,
12+
update_api_v1_chunked_package_cache_for_community,
1013
)
1114

15+
logger = logging.getLogger(__name__)
16+
1217

1318
@shared_task(
1419
name="thunderstore.repository.tasks.update_api_caches",
@@ -28,11 +33,37 @@ def update_experimental_package_index():
2833
update_api_experimental_package_index()
2934

3035

36+
@shared_task(
37+
name="thunderstore.repository.tasks.update_single_community_cache",
38+
queue=CeleryQueues.BackgroundLongRunning,
39+
expires=60 * 5,
40+
soft_time_limit=60 * 15,
41+
)
42+
def update_single_community_cache(community_pk: int):
43+
"""Update the package cache for a single community."""
44+
45+
community = Community.objects.get(pk=community_pk)
46+
update_api_v1_chunked_package_cache_for_community(community)
47+
48+
3149
@shared_task(
3250
name="thunderstore.repository.tasks.update_chunked_package_caches",
3351
queue=CeleryQueues.BackgroundLongRunning,
3452
soft_time_limit=60 * 60 * 23,
3553
time_limit=60 * 60 * 24,
3654
)
3755
def update_chunked_community_package_caches():
38-
update_api_v1_chunked_package_caches()
56+
"""Update the package caches for all communities in parallel."""
57+
58+
try:
59+
communities = Community.objects.values_list("pk", flat=True).iterator()
60+
61+
task_group = group(
62+
update_single_community_cache.s(community_pk)
63+
for community_pk in communities
64+
)
65+
66+
result = task_group.apply_async()
67+
return result
68+
except Exception as e: # pragma: no cover
69+
logger.error(f"Failed starting parallel update of community caches. Error: {e}")
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import pytest
2+
3+
from thunderstore.community.factories import CommunityFactory
4+
from thunderstore.repository.tasks import update_chunked_community_package_caches
5+
6+
7+
@pytest.mark.django_db
8+
def test_update_chunked_community_package_caches_in_parallel():
9+
"""
10+
Test that the celery tasks creates sub-tasks for each community and they
11+
run successfully.
12+
"""
13+
14+
size = 5
15+
CommunityFactory.create_batch(size=size)
16+
result = update_chunked_community_package_caches.apply_async()
17+
result = result.get()
18+
sub_task_results = result.children
19+
20+
assert result.successful()
21+
for sub_task in sub_task_results:
22+
assert sub_task.successful()
23+
24+
assert len(sub_task_results) == size

0 commit comments

Comments
 (0)