Skip to content

Commit 6c678f2

Browse files
committed
Move the job status part of sync into a helper function
1 parent 2153ba1 commit 6c678f2

File tree

1 file changed

+27
-22
lines changed

1 file changed

+27
-22
lines changed

controller/sync.py

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -66,33 +66,38 @@ def sync_backend(backend):
6666
if not job_requests:
6767
return
6868

69-
with duration_ms_as_span_attr("find_ids.duration_ms", span):
70-
job_request_ids = [i.id for i in job_requests]
71-
7269
with duration_ms_as_span_attr("create.duration_ms", span):
7370
for job_request in job_requests:
7471
with set_log_context(job_request=job_request):
7572
create_or_update_jobs(job_request)
7673

77-
# `job_request_ids` contains all the JobRequests which job-server thinks are
78-
# active; this query gets all those which _we_ think are active
79-
with duration_ms_as_span_attr("find_more_ids.duration_ms", span):
80-
active_job_request_ids = select_values(
81-
Job, "job_request_id", state__in=[State.PENDING, State.RUNNING]
82-
)
83-
# We sync all jobs belonging to either set (using `dict.fromkeys` to preserve order
84-
# for easier testing)
85-
job_request_ids_to_sync = list(
86-
dict.fromkeys(job_request_ids + active_job_request_ids)
87-
)
88-
with duration_ms_as_span_attr("find_where.duration_ms", span):
89-
jobs = find_where(Job, job_request_id__in=job_request_ids_to_sync)
90-
with duration_ms_as_span_attr("encode_jobs.duration_ms", span):
91-
jobs_data = [job_to_remote_format(i) for i in jobs]
92-
log.debug(f"Syncing {len(jobs_data)} jobs back to job-server")
93-
94-
with duration_ms_as_span_attr("api_post.duration_ms", span):
95-
api_post("jobs", backend=backend, json=jobs_data)
74+
sync_backend_jobs_status(backend, job_requests, span)
75+
76+
77+
# TODO: this function will be replaced by a call to the RAP API rap/status
78+
def sync_backend_jobs_status(backend, job_requests, span):
79+
with duration_ms_as_span_attr("find_ids.duration_ms", span):
80+
job_request_ids = [i.id for i in job_requests]
81+
82+
# `job_request_ids` contains all the JobRequests which job-server thinks are
83+
# active; this query gets all those which _we_ think are active
84+
with duration_ms_as_span_attr("find_more_ids.duration_ms", span):
85+
active_job_request_ids = select_values(
86+
Job, "job_request_id", state__in=[State.PENDING, State.RUNNING]
87+
)
88+
# We sync all jobs belonging to either set (using `dict.fromkeys` to preserve order
89+
# for easier testing)
90+
job_request_ids_to_sync = list(
91+
dict.fromkeys(job_request_ids + active_job_request_ids)
92+
)
93+
with duration_ms_as_span_attr("find_where.duration_ms", span):
94+
jobs = find_where(Job, job_request_id__in=job_request_ids_to_sync)
95+
with duration_ms_as_span_attr("encode_jobs.duration_ms", span):
96+
jobs_data = [job_to_remote_format(i) for i in jobs]
97+
log.debug(f"Syncing {len(jobs_data)} jobs back to job-server")
98+
99+
with duration_ms_as_span_attr("api_post.duration_ms", span):
100+
api_post("jobs", backend=backend, json=jobs_data)
96101

97102

98103
def api_get(*args, backend, **kwargs):

0 commit comments

Comments
 (0)