Skip to content

Commit 7605768

Browse files
yadudockhk-globus
authored andcommitted
Make MPITaskScheduler prioritize large tasks (#3805)
# Description The python `PriorityQueue` prioritizes items with smaller priority values, which leads us to our current `MPITaskScheduler` prioritizing MPI tasks with fewer nodes requested since the nodes requested is used as the priority. This PR changes the ordering to prefer larger jobs. # Changed Behaviour MPI Tasks with larger node requests are prioritized by the manager. Please note that as @WardLT pointed out in the comments on #3783, the greedy scheduling that we use can lead to larger tasks getting delayed in favor of smaller tasks that can get scheduled immediately. These are change split from #3783 to keep the PR concise. This is split 3 of 3. ## Type of change Choose which options apply, and delete the ones which do not apply. - Update to human readable text: Documentation/error messages/comments
1 parent 14c339d commit 7605768

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

parsl/executors/high_throughput/mpi_resource_management.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ def put_task(self, task_package: dict):
196196
_f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"])
197197

198198
nodes_needed = resource_spec.get("num_nodes")
199-
prioritized_task = PrioritizedTask(priority=nodes_needed,
199+
# Prioritize large jobs
200+
prioritized_task = PrioritizedTask(priority=-1 * nodes_needed,
200201
task=task_package,
201202
unpacked_task=(_f, _args, _kwargs, resource_spec),
202203
nodes_needed=nodes_needed)

parsl/tests/test_mpi_apps/test_mpi_scheduler.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import os
33
import pickle
4+
import random
45
from unittest import mock
56

67
import pytest
@@ -218,3 +219,38 @@ def test_tiny_large_loop():
218219
got_result = scheduler.get_result(True, 1)
219220

220221
assert got_result == result_pkl
222+
223+
224+
@pytest.mark.local
225+
def test_larger_jobs_prioritized():
226+
"""Larger jobs should be scheduled first"""
227+
228+
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
229+
scheduler = MPITaskScheduler(task_q, result_q)
230+
231+
max_nodes = len(scheduler.available_nodes)
232+
233+
# The first task will get scheduled with all the nodes,
234+
# and the remainder hits the backlog queue.
235+
node_request_list = [max_nodes] + [random.randint(1, 4) for _i in range(8)]
236+
237+
for task_id, num_nodes in enumerate(node_request_list):
238+
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
239+
resource_specification={
240+
"num_nodes": num_nodes,
241+
"ranks_per_node": 2
242+
})
243+
task_package = {"task_id": task_id, "buffer": mock_task_buffer}
244+
scheduler.put_task(task_package)
245+
246+
# Confirm that the tasks are sorted in decreasing order
247+
output_priority = []
248+
for i in range(len(node_request_list) - 1):
249+
p_task = scheduler._backlog_queue.get()
250+
output_priority.append(p_task.nodes_needed)
251+
252+
# Remove the first large job that blocks the nodes and forces following
253+
# tasks into backlog
254+
expected_priority = node_request_list[1:]
255+
expected_priority.sort(reverse=True)
256+
assert expected_priority == output_priority, "Expected nodes in decreasing sorted order"

0 commit comments

Comments
 (0)