Skip to content

fix: implement create_user_task workaround #467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/ol_openedx_course_export/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ class CourseExportConfig(AppConfig):
}
},
}

def ready(self):
"""
Import signals to enable celery task protocol workaround
"""
import ol_openedx_course_export.signals # noqa: F401
64 changes: 64 additions & 0 deletions src/ol_openedx_course_export/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Signal handlers for the ol_openedx_course_export app.

This module implements a workaround for a compatibility issue between django-user-tasks
and Celery's protocol version 2. The create_user_task signal handler expects protocol
version 1 format but receives version 2, causing TypeError exceptions in the logs.

This fix wraps the original handler, converts protocol v2 messages to v1 format, and
ensures UserTask-based tasks work properly without generating errors.
"""

from celery import chain, signals
from cms.celery import APP
from user_tasks.signals import create_user_task

signals.before_task_publish.disconnect(create_user_task)


def create_user_task_wrapper(sender=None, body=None, **kwargs):
return create_user_task(
sender,
body
if APP.conf.task_protocol == 1
else proto2_to_proto1(body, kwargs.get("headers", {})),
)


signals.before_task_publish.connect(create_user_task_wrapper)


def proto2_to_proto1(body, headers):
args, kwargs, embed = body
embedded = _extract_proto2_embed(**embed)
chained = embedded.pop("chain")
new_body = dict(
_extract_proto2_headers(**headers), args=args, kwargs=kwargs, **embedded
)
if chained:
new_body["callbacks"].append(chain(chained))
return new_body


def _extract_proto2_headers( # noqa: PLR0913
task_id, retries, eta, expires, group, timelimit, task, **_
):
return {
"id": task_id,
"task": task,
"retries": retries,
"eta": eta,
"expires": expires,
"utc": True,
"taskset": group,
"timelimit": timelimit,
}


def _extract_proto2_embed(callbacks, errbacks, chain, chord, **_):
return {
"callbacks": callbacks or [],
"errbacks": errbacks,
"chain": chain,
"chord": chord,
}
Loading