Skip to content

Commit 6f1c342

Browse files
authored
Fix background job email failures (#3045)
Fixes #3044 - Use `asyncio.create_task` to send background job failure emails (tracking tasks in set in ops class to avoid garbage collection) - Update job in database before sending emails - Add try/except around email sending so failures are logged instead of throwing uncaught exception - Rename `handle_replica_job_finished` to `handle_replica_job_succeeded` to better reflect intent - Fix failed background job email template and ensure that job sent to it is JSON-serializable with `model_dump(mode="json")
1 parent 2750c38 commit 6f1c342

File tree

5 files changed

+62
-45
lines changed

5 files changed

+62
-45
lines changed

backend/btrixcloud/background_jobs.py

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,27 @@ def __init__(self, mdb, email, user_manager, org_ops, crawl_manager, storage_ops
8181
responses={404: {"description": "Not found"}},
8282
)
8383

84+
# to avoid background tasks being garbage collected
85+
# see: https://stackoverflow.com/a/74059981
86+
self.bg_tasks = set()
87+
8488
def set_ops(self, base_crawl_ops: BaseCrawlOps, profile_ops: ProfileOps) -> None:
8589
"""basecrawlops and profileops for updating files"""
8690
self.base_crawl_ops = base_crawl_ops
8791
self.profile_ops = profile_ops
8892

93+
def _run_task(self, func) -> None:
94+
"""add bg tasks to set to avoid premature garbage collection"""
95+
task = asyncio.create_task(func)
96+
self.bg_tasks.add(task)
97+
task.add_done_callback(self.bg_tasks.discard)
98+
8999
def strip_bucket(self, endpoint_url: str) -> tuple[str, str]:
90100
"""split the endpoint_url into the origin and return rest of endpoint as bucket path"""
91101
parts = urlsplit(endpoint_url)
92102
return parts.scheme + "://" + parts.netloc + "/", parts.path[1:]
93103

94-
async def handle_replica_job_finished(self, job: CreateReplicaJob) -> None:
104+
async def handle_replica_job_succeeded(self, job: CreateReplicaJob) -> None:
95105
"""Update replicas in corresponding file objects, based on type"""
96106
res = None
97107
if job.object_type in ("crawl", "upload"):
@@ -516,36 +526,41 @@ async def job_finished(
516526
raise HTTPException(status_code=400, detail="invalid_job_type")
517527

518528
if success and job_type == BgJobType.CREATE_REPLICA:
519-
await self.handle_replica_job_finished(cast(CreateReplicaJob, job))
529+
await self.handle_replica_job_succeeded(cast(CreateReplicaJob, job))
520530

521531
if job_type == BgJobType.DELETE_REPLICA:
522532
await self.handle_delete_replica_job_finished(cast(DeleteReplicaJob, job))
523533

524-
if not success:
525-
await self._send_bg_job_failure_email(job, finished)
526-
527534
await self.jobs.find_one_and_update(
528535
{"_id": job_id, "oid": oid},
529536
{"$set": {"success": success, "finished": finished}},
530537
)
531538

539+
if not success:
540+
await self._send_bg_job_failure_email(job, finished)
541+
532542
async def _send_bg_job_failure_email(self, job: BackgroundJob, finished: datetime):
533543
print(
534544
f"Background job {job.id} failed, sending email to superuser",
535545
flush=True,
536546
)
537-
superuser = await self.user_manager.get_superuser()
538-
org = None
539-
if job.oid:
540-
org = await self.org_ops.get_org_by_id(job.oid)
541-
await asyncio.get_event_loop().run_in_executor(
542-
None,
543-
self.email.send_background_job_failed,
544-
job,
545-
finished,
546-
superuser.email,
547-
org,
548-
)
547+
try:
548+
superuser = await self.user_manager.get_superuser()
549+
org = None
550+
if job.oid:
551+
org = await self.org_ops.get_org_by_id(job.oid)
552+
553+
self._run_task(
554+
self.email.send_background_job_failed(
555+
job, finished, superuser.email, org
556+
)
557+
)
558+
# pylint: disable=broad-exception-caught
559+
except Exception as err:
560+
print(
561+
f"Error sending bg job failure email for job {job.id}: {err}",
562+
flush=True,
563+
)
549564

550565
async def get_background_job(
551566
self, job_id: str, oid: Optional[UUID] = None
@@ -780,11 +795,8 @@ async def retry_failed_org_background_jobs(
780795
Keep track of tasks in set to prevent them from being garbage collected
781796
See: https://stackoverflow.com/a/74059981
782797
"""
783-
bg_tasks = set()
784798
async for job in self.jobs.find({"oid": org.id, "success": False}):
785-
task = asyncio.create_task(self.retry_background_job(job["_id"], org))
786-
bg_tasks.add(task)
787-
task.add_done_callback(bg_tasks.discard)
799+
self._run_task(self.retry_background_job(job["_id"], org))
788800
return {"success": True}
789801

790802
async def retry_all_failed_background_jobs(
@@ -795,14 +807,11 @@ async def retry_all_failed_background_jobs(
795807
Keep track of tasks in set to prevent them from being garbage collected
796808
See: https://stackoverflow.com/a/74059981
797809
"""
798-
bg_tasks = set()
799810
async for job in self.jobs.find({"success": False}):
800811
org = None
801812
if job.get("oid"):
802813
org = await self.org_ops.get_org_by_id(job["oid"])
803-
task = asyncio.create_task(self.retry_background_job(job["_id"], org))
804-
bg_tasks.add(task)
805-
task.add_done_callback(bg_tasks.discard)
814+
self._run_task(self.retry_background_job(job["_id"], org))
806815
return {"success": True}
807816

808817

backend/btrixcloud/emailsender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ async def send_background_job_failed(
197197
await self._send_encrypted(
198198
receiver_email,
199199
"failedBgJob",
200-
job=job,
200+
job=job.model_dump(mode="json"),
201201
org=str(org.id) if org else None,
202202
finished=finished.isoformat(),
203203
)

backend/btrixcloud/operator/bgjobs.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ async def finalize_background_job(self, data: MCDecoratorSyncData) -> dict:
4343
success = status.get("succeeded") == spec.get("parallelism")
4444
if not success:
4545
print(
46-
"Succeeded: {status.get('succeeded')}, Num Pods: {spec.get('parallelism')}"
46+
f"Succeeded: {status.get('succeeded')}, Num Pods: {spec.get('parallelism')}"
4747
)
4848
start_time = status.get("startTime")
4949
completion_time = status.get("completionTime")
@@ -75,10 +75,6 @@ async def finalize_background_job(self, data: MCDecoratorSyncData) -> dict:
7575
finished=finished,
7676
oid=org_id,
7777
)
78-
# print(
79-
# f"{job_type} background job completed: success: {success}, {job_id}",
80-
# flush=True,
81-
# )
8278

8379
# pylint: disable=broad-except
8480
except Exception:

backend/test/test_emails.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,14 @@
1010
import pytest
1111

1212
from btrixcloud.emailsender import EmailSender
13-
from btrixcloud.models import Organization, InvitePending, EmailStr, StorageRef
13+
from btrixcloud.models import (
14+
Organization,
15+
InvitePending,
16+
EmailStr,
17+
StorageRef,
18+
CreateReplicaJob,
19+
)
20+
from btrixcloud.utils import dt_now
1421

1522
EMAILS_HOST_PREFIX = (
1623
os.environ.get("EMAIL_TEMPLATE_ENDPOINT") or "http://127.0.0.1:30872"
@@ -193,17 +200,19 @@ async def test_send_password_reset(email_sender, capsys):
193200
@pytest.mark.asyncio
194201
async def test_send_background_job_failed(email_sender, sample_org, capsys):
195202
"""Test sending background job failure notification"""
196-
# Create a mock job
197-
job = {
198-
"id": str(uuid.uuid4()),
199-
"type": "create_replica",
200-
"crawl_id": "test_crawl_123",
201-
"started": datetime.now().isoformat(),
202-
}
203-
finished = datetime.now()
203+
job = CreateReplicaJob(
204+
id="fake-create-replica-job",
205+
oid=sample_org.id,
206+
success=False,
207+
started=dt_now(),
208+
file_path="path/to/file.wacz",
209+
object_type="crawl",
210+
object_id="sample-crawl-id",
211+
replica_storage=StorageRef(name="test-storage"),
212+
)
204213

205214
await email_sender.send_background_job_failed(
206-
job=job, finished=finished, receiver_email="[email protected]", org=sample_org
215+
job=job, finished=dt_now(), receiver_email="[email protected]", org=sample_org
207216
)
208217

209218
# Check log output

emails/emails/failed-bg-job.tsx

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ export const schema = z.object({
1313
object_type: z.string().optional(),
1414
object_id: z.string().optional(),
1515
file_path: z.string().optional(),
16-
replica_storage: z.string().optional(),
16+
replica_storage: z.object({
17+
name: z.string(),
18+
custom: z.boolean(),
19+
}).optional(),
1720
}),
1821
finished: z.coerce.date(),
1922
});
@@ -84,9 +87,9 @@ export const FailedBgJobEmail = ({
8487
<Code>{job.file_path}</Code>
8588
</DataRow>
8689
)}
87-
{job.replica_storage && (
90+
{job.replica_storage && job.replica_storage.name && (
8891
<DataRow label="Replica Storage">
89-
<Code>{job.replica_storage}</Code>
92+
<Code>{job.replica_storage.name}</Code>
9093
</DataRow>
9194
)}
9295
</table>
@@ -104,7 +107,7 @@ FailedBgJobEmail.PreviewProps = {
104107
object_type: "object_type",
105108
object_id: "object_id",
106109
file_path: "file_path",
107-
replica_storage: "replica_storage",
110+
replica_storage: {name: "replica_storage", custom: false},
108111
},
109112
finished: new Date(),
110113
} satisfies FailedBgJobEmailProps;

0 commit comments

Comments
 (0)