Skip to content

[17.0] [IMP] queue_job: add_depends on RetryableJobError #786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 17.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,16 @@
http.request.session.db = db
env = http.request.env(user=SUPERUSER_ID)

def retry_postpone(job, message, seconds=None):
def retry_postpone(job, message, seconds=None, add_depends=None):

Check warning on line 79 in queue_job/controllers/main.py

View check run for this annotation

Codecov / codecov/patch

queue_job/controllers/main.py#L79

Added line #L79 was not covered by tests
job.env.clear()
with registry(job.env.cr.dbname).cursor() as new_cr:
job.env = api.Environment(new_cr, SUPERUSER_ID, {})
new_env = api.Environment(new_cr, SUPERUSER_ID, {})
job.env = new_env

Check warning on line 83 in queue_job/controllers/main.py

View check run for this annotation

Codecov / codecov/patch

queue_job/controllers/main.py#L82-L83

Added lines #L82 - L83 were not covered by tests
job.postpone(result=message, seconds=seconds)
if add_depends:
for dependency in add_depends:
dependency.env = new_env
dependency.store()

Check warning on line 88 in queue_job/controllers/main.py

View check run for this annotation

Codecov / codecov/patch

queue_job/controllers/main.py#L87-L88

Added lines #L87 - L88 were not covered by tests
job.set_pending(reset_retry=False)
job.store()

Expand Down Expand Up @@ -126,7 +131,9 @@

except RetryableJobError as err:
# delay the job later, requeue
retry_postpone(job, str(err), seconds=err.seconds)
retry_postpone(

Check warning on line 134 in queue_job/controllers/main.py

View check run for this annotation

Codecov / codecov/patch

queue_job/controllers/main.py#L134

Added line #L134 was not covered by tests
job, str(err), seconds=err.seconds, add_depends=err.add_depends
)
_logger.debug("%s postponed", job)
# Do not trigger the error up because we don't want an exception
# traceback in the logs we should have the traceback when all
Expand Down
6 changes: 5 additions & 1 deletion queue_job/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ class RetryableJobError(JobError):
by :const:`odoo.addons.queue_job.job.RETRY_INTERVAL` if nothing is defined.

If ``ignore_retry`` is True, the retry counter will not be increased.

If ``add_depends`` is provided, the jobs will be added as dependencies to
the current job.
"""

def __init__(self, msg, seconds=None, ignore_retry=False):
def __init__(self, msg, seconds=None, ignore_retry=False, add_depends=None):
super().__init__(msg)
self.seconds = seconds
self.ignore_retry = ignore_retry
self.add_depends = add_depends


# TODO: remove support of NothingToDo: too dangerous
Expand Down
5 changes: 5 additions & 0 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,11 @@ def perform(self):
try:
self.result = self.func(*tuple(self.args), **self.kwargs)
except RetryableJobError as err:
if err.add_depends:
from .delay import DelayableGraph
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curiosity: why do we import this here instead of the file's top? Any specific technical reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid a circular dependency


DelayableGraph._ensure_same_graph_uuid([self] + err.add_depends)
self.add_depends(err.add_depends)
if err.ignore_retry:
self.retry -= 1
raise
Expand Down
15 changes: 15 additions & 0 deletions test_queue_job/models/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ def create_ir_logging(self, message, level="info"):
}
)

def job_with_retry_and_new_dependency(self):
logging_domain = [
("name", "=", "test_queue_job"),
("message", "=", "job_with_retry_and_new_dependency"),
]
if not self.env["ir.logging"].search_count(logging_domain):
new_job = self.with_delay().create_ir_logging(
message="job_with_retry_and_new_dependency"
)
raise RetryableJobError(
"Must be retried after creating the logging",
add_depends=[new_job],
)
return True

def no_description(self):
return

Expand Down
34 changes: 34 additions & 0 deletions test_queue_job/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,40 @@ def test_infinite_retryable_error(self):
test_job.perform()
self.assertEqual(test_job.retry, 1)

def test_retryable_error_with_new_dependency(self):
job = Job(self.env["test.queue.job"].job_with_retry_and_new_dependency)
job.store()
with self.assertRaises(RetryableJobError):
job.perform()
job.store()
self.assertEqual(job.retry, 1)
self.assertEqual(len(job.depends_on), 1, "There's a new dependency for the job")
new_job = next(iter(job.depends_on))
self.assertEqual(
len(new_job.reverse_depends_on),
1,
"There's a reverse dependency for the new job",
)
self.assertEqual(
next(iter(new_job.reverse_depends_on)),
job,
"They are bi-directionally linked",
)
self.assertEqual(job.state, "wait_dependencies")
self.assertEqual(new_job.state, "pending")
job.store()
new_job.store()
# Now run the dependency
new_job.perform()
new_job.set_done()
new_job.store()
self.env.flush_all()
new_job.enqueue_waiting()
# Force a reload of the job from the db state
job = Job.load(self.env, job.uuid)
self.assertEqual(job.state, "pending")
job.perform()

def test_on_instance_method(self):
class A:
def method(self):
Expand Down