Skip to content

Commit be40889

Browse files
Address review comments
1 parent cc1ef6c commit be40889

File tree

7 files changed

+140
-131
lines changed

7 files changed

+140
-131
lines changed

README.md

Lines changed: 61 additions & 49 deletions
Large diffs are not rendered by default.

pytest_parallel/mpi_reporter.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import numpy as np
21
import pytest
32
import sys
43
from mpi4py import MPI
@@ -321,7 +320,7 @@ def schedule_test(item, available_procs, inter_comm):
321320

322321
# mark the procs as busy
323322
for sub_rank in sub_ranks:
324-
available_procs[sub_rank] = False
323+
available_procs[sub_rank] = 0
325324

326325
# TODO isend would be slightly better (less waiting)
327326
for sub_rank in sub_ranks:
@@ -357,15 +356,15 @@ def wait_test_to_complete(items_to_run, session, available_procs, inter_comm):
357356

358357
# the procs are now available
359358
for sub_rank in sub_ranks:
360-
available_procs[sub_rank] = True
359+
available_procs[sub_rank] = 1
361360

362361
# "run" the test (i.e. trigger PyTest pipeline but do not really run the code)
363362
nextitem = None # not known at this point
364363
run_item_test(item, nextitem, session)
365364

366365

367366
def wait_last_tests_to_complete(items_to_run, session, available_procs, inter_comm):
368-
while np.sum(available_procs) < len(available_procs):
367+
while sum(available_procs) < len(available_procs):
369368
wait_test_to_complete(items_to_run, session, available_procs, inter_comm)
370369

371370

@@ -450,10 +449,10 @@ def pytest_runtestloop(self, session) -> bool:
450449

451450
# schedule tests to run
452451
items_left_to_run = sorted(items_to_run, key=lambda item: item.n_proc)
453-
available_procs = np.ones(n_workers, dtype=np.int8)
452+
available_procs = [1] * n_workers
454453

455454
while len(items_left_to_run) > 0:
456-
n_av_procs = np.sum(available_procs)
455+
n_av_procs = sum(available_procs)
457456

458457
item_idx = item_with_biggest_admissible_n_proc(items_left_to_run, n_av_procs)
459458

pytest_parallel/plugin.py

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
import pytest
1111
from _pytest.terminal import TerminalReporter
1212

13+
class PytestParallelError(ValueError):
14+
pass
15+
1316
# --------------------------------------------------------------------------
1417
def pytest_addoption(parser):
1518
parser.addoption(
@@ -26,9 +29,8 @@ def pytest_addoption(parser):
2629

2730
parser.addoption('--slurm-options', dest='slurm_options', type=str, help='list of SLURM options e.g. "--time=00:30:00 --qos=my_queue"')
2831
parser.addoption('--slurm-srun-options', dest='slurm_srun_options', type=str, help='list of SLURM srun options e.g. "--mem-per-cpu=4GB"')
29-
parser.addoption('--slurm-additional-cmds', dest='slurm_additional_cmds', type=str, help='list of commands to pass to SLURM job e.g. "source my_env.sh"')
32+
parser.addoption('--slurm-init-cmds', dest='slurm_init_cmds', type=str, help='list of commands to pass to SLURM job e.g. "source my_env.sh"')
3033
parser.addoption('--slurm-file', dest='slurm_file', type=str, help='Path to file containing header of SLURM job') # TODO DEL
31-
parser.addoption('--slurm-sub-command', dest='slurm_sub_command', type=str, help='SLURM submission command (defaults to `sbatch`)') # TODO DEL
3234

3335
if sys.version_info >= (3,9):
3436
parser.addoption('--slurm-export-env', dest='slurm_export_env', action=argparse.BooleanOptionalAction, default=True)
@@ -51,19 +53,21 @@ def pytest_addoption(parser):
5153
# because it can mess SLURM `srun`
5254
if "--scheduler=slurm" in sys.argv:
5355
assert 'mpi4py.MPI' not in sys.modules, 'Internal pytest_parallel error: mpi4py.MPI should not be imported' \
54-
' when we are about to register and environment for SLURM' \
56+
' when we are about to register an environment for SLURM' \
5557
' (because importing mpi4py.MPI makes the current process look like and MPI process,' \
5658
' and SLURM does not like that)'
57-
assert os.getenv('I_MPI_MPIRUN') is None, 'Internal pytest_parallel error: the environment variable I_MPI_MPIRUN is set' \
58-
f' with value "{os.getenv("I_MPI_MPIRUN")}"' \
59-
' while pytest was invoked with "--scheduler=slurm".\n' \
60-
' This indicates that pytest was run through MPI, and SLURM generally does not like that.\n' \
61-
' With "--scheduler=slurm", just run `pytest` directly, not through `mpirun/mpiexec/srun`,\n' \
62-
' because it will launch MPI itself (you may want to use --n-workers=<number of processes>).'
59+
if os.getenv('I_MPI_MPIRUN') is not None:
60+
err_msg = 'Internal pytest_parallel error: the environment variable I_MPI_MPIRUN is set' \
61+
f' (it has value "{os.getenv("I_MPI_MPIRUN")}"),\n' \
62+
' while pytest was invoked with "--scheduler=slurm".\n' \
63+
' This indicates that pytest was run through MPI, and SLURM generally does not like that.\n' \
64+
' With "--scheduler=slurm", just run `pytest` directly, not through `mpirun/mpiexec/srun`,\n' \
65+
' because it will launch MPI itself (you may want to use --n-workers=<number of processes>).'
66+
raise PytestParallelError(err_msg)
6367

6468
r = subprocess.run(['env','--null'], stdout=subprocess.PIPE) # `--null`: end each output line with NUL, required by `sbatch --export-file`
6569

66-
assert r.returncode==0, 'SLURM scheduler: error when writing `env` to `pytest_slurm/env_vars.sh`'
70+
assert r.returncode==0, 'Internal pytest_parallel SLURM schedule error: error when writing `env` to `pytest_slurm/env_vars.sh`'
6771
pytest._pytest_parallel_env_vars = r.stdout
6872

6973
# --------------------------------------------------------------------------
@@ -95,40 +99,50 @@ def pytest_configure(config):
9599
n_workers = config.getoption('n_workers')
96100
slurm_options = config.getoption('slurm_options')
97101
slurm_srun_options = config.getoption('slurm_srun_options')
98-
slurm_additional_cmds = config.getoption('slurm_additional_cmds')
102+
slurm_init_cmds = config.getoption('slurm_init_cmds')
99103
is_worker = config.getoption('_worker')
100104
slurm_file = config.getoption('slurm_file')
101105
slurm_export_env = config.getoption('slurm_export_env')
102-
slurm_sub_command = config.getoption('slurm_sub_command')
103106
detach = config.getoption('detach')
104107
if scheduler != 'slurm' and scheduler != 'shell':
105-
assert not is_worker, 'Option `--slurm-worker` only available when `--scheduler=slurm` or `--scheduler=shell`'
108+
assert not is_worker, f'Internal pytest_parallel error `--_worker` not available with`--scheduler={scheduler}`'
106109
if (scheduler == 'slurm' or scheduler == 'shell') and not is_worker:
107-
assert n_workers, f'You need to specify `--n-workers` when `--scheduler={scheduler}`'
110+
if n_workers is None:
111+
raise PytestParallelError(f'You need to specify `--n-workers` when `--scheduler={scheduler}`')
108112
if scheduler != 'slurm':
109-
assert not slurm_options, 'Option `--slurm-options` only available when `--scheduler=slurm`'
110-
assert not slurm_srun_options, 'Option `--slurms-run-options` only available when `--scheduler=slurm`'
111-
assert not slurm_additional_cmds, 'Option `--slurm-additional-cmds` only available when `--scheduler=slurm`'
112-
assert not slurm_file, 'Option `--slurm-file` only available when `--scheduler=slurm`'
113+
if slurm_options is not None:
114+
raise PytestParallelError('Option `--slurm-options` only available when `--scheduler=slurm`')
115+
if slurm_srun_options is not None:
116+
raise PytestParallelError('Option `--slurms-run-options` only available when `--scheduler=slurm`')
117+
if slurm_init_cmds is not None:
118+
raise PytestParallelError('Option `--slurm-init-cmds` only available when `--scheduler=slurm`')
119+
if slurm_file is not None:
120+
raise PytestParallelError('Option `--slurm-file` only available when `--scheduler=slurm`')
113121

114122
if (scheduler == 'shell' or scheduler == 'slurm') and not is_worker:
115123
from mpi4py import MPI
116-
assert MPI.COMM_WORLD.size == 1, 'Do not launch `pytest_parallel` on more that one process\n' \
117-
'when `--scheduler=shell` or `--scheduler=slurm`.\n' \
118-
'`pytest_parallel` spawns MPI processes itself.\n' \
119-
f'You may want to use --n-workers={MPI.COMM_WORLD.size}.'
124+
if MPI.COMM_WORLD.size != 1:
125+
err_msg = 'Do not launch `pytest_parallel` on more that one process when `--scheduler=shell` or `--scheduler=slurm`.\n' \
126+
'`pytest_parallel` will spawn MPI processes itself.\n' \
127+
f'You may want to use --n-workers={MPI.COMM_WORLD.size}.'
128+
raise PytestParallelError(err_msg)
120129

121130

122131

123132
if scheduler == 'slurm' and not is_worker:
124-
assert slurm_options or slurm_file, 'You need to specify either `--slurm-options` or `--slurm-file` when `--scheduler=slurm`'
133+
if slurm_options is None and slurm_file is None:
134+
raise PytestParallelError('You need to specify either `--slurm-options` or `--slurm-file` when `--scheduler=slurm`')
125135
if slurm_options:
126-
assert not slurm_file, 'You need to specify either `--slurm-options` or `--slurm-file`, but not both'
136+
if slurm_file:
137+
raise PytestParallelError('You need to specify either `--slurm-options` or `--slurm-file`, but not both')
127138
if slurm_file:
128-
assert not slurm_options, 'You need to specify either `--slurm-options` or `--slurm-file`, but not both'
129-
assert not slurm_additional_cmds, 'You cannot specify `--slurm-additional-cmds` together with `--slurm-file`'
139+
if slurm_options:
140+
raise PytestParallelError('You need to specify either `--slurm-options` or `--slurm-file`, but not both')
141+
if slurm_init_cmds:
142+
raise PytestParallelError('You cannot specify `--slurm-init-cmds` together with `--slurm-file`')
130143

131-
assert '-n=' not in slurm_options and '--ntasks=' not in slurm_options, 'Do not specify `-n/--ntasks` in `--slurm-options` (it is deduced from the `--n-worker` value).'
144+
if '-n=' in slurm_options or '--ntasks=' in slurm_options:
145+
raise PytestParallelError('Do not specify `-n/--ntasks` in `--slurm-options` (it is deduced from the `--n-worker` value).')
132146

133147
from .slurm_scheduler import SlurmScheduler
134148

@@ -143,12 +157,11 @@ def pytest_configure(config):
143157
main_invoke_params = main_invoke_params.replace(file_or_dir, '')
144158
slurm_option_list = slurm_options.split() if slurm_options is not None else []
145159
slurm_conf = {
146-
'options' : slurm_option_list,
147-
'srun_options' : slurm_srun_options,
148-
'additional_cmds': slurm_additional_cmds,
149-
'file' : slurm_file,
150-
'export_env' : slurm_export_env,
151-
'sub_command' : slurm_sub_command,
160+
'options' : slurm_option_list,
161+
'srun_options': slurm_srun_options,
162+
'init_cmds' : slurm_init_cmds,
163+
'file' : slurm_file,
164+
'export_env' : slurm_export_env,
152165
}
153166
plugin = SlurmScheduler(main_invoke_params, n_workers, slurm_conf, detach)
154167

pytest_parallel/send_report.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,26 @@ def _file_path(when):
2626
test_info = {'test_idx': args._test_idx, 'fatal_error': None} # TODO no fatal_error=None (absense means no error)
2727

2828
# 'fatal_error' file
29-
try:
30-
file_path = _file_path('fatal_error')
29+
file_path = _file_path('fatal_error')
30+
if file_path.exists():
3131
with open(file_path, 'r') as file:
3232
fatal_error = file.read()
3333
test_info['fatal_error'] = fatal_error
34-
except FileNotFoundError: # There was no fatal error
35-
pass
3634

3735

3836
# 'setup/call/teardown' files
3937
already_failed = False
4038
for when in ('setup', 'call', 'teardown'):
41-
try:
42-
file_path = _file_path(when)
43-
with open(file_path, 'rb') as file:
44-
report_info = file.read()
45-
report_info = pickle.loads(report_info)
46-
test_info[when] = report_info
47-
except FileNotFoundError: # Supposedly not found because the test crashed before writing the file
39+
file_path = _file_path(when)
40+
if file_path.exists():
41+
try:
42+
with open(file_path, 'rb') as file:
43+
report_info = file.read()
44+
report_info = pickle.loads(report_info)
45+
test_info[when] = report_info
46+
except pickle.PickleError:
47+
test_info['fatal_error'] = f'FATAL ERROR in pytest_parallel : unable to decode {file_path}'
48+
else: # Supposedly not found because the test crashed before writing the file
4849
collect_longrepr = []
4950
msg = f'Error: the test crashed. '
5051
red = 31
@@ -63,8 +64,6 @@ def _file_path(when):
6364
'duration': 0, } # unable to report accurately
6465

6566
already_failed = True
66-
except pickle.PickleError:
67-
test_info['fatal_error'] = f'FATAL ERROR in pytest_parallel : unable to decode {file_path}'
6867

6968

7069
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

pytest_parallel/shell_static_scheduler.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
import socket
66
import pickle
77
from pathlib import Path
8-
from .utils.socket import recv as socket_recv, setup_socket
8+
from .utils.socket import recv as socket_recv
9+
from .utils.socket import setup_socket
910
from .utils.items import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index, mark_skip
1011
from .utils.file import remove_exotic_chars, create_folders
1112
from .algo import partition
1213
from .static_scheduler_utils import group_items_by_parallel_steps
1314
from mpi4py import MPI
14-
import numpy as np
1515

1616
def mpi_command(current_proc, n_proc):
1717
mpi_vendor = MPI.get_vendor()[0]
@@ -43,7 +43,7 @@ def submit_items(items_to_run, SCHEDULER_IP_ADDRESS, port, session_folder, main_
4343
cmd += mpi_command(current_proc, item.n_proc)
4444
cmd += f' python3 -u -m pytest -s --_worker {socket_flags} {main_invoke_params} --_test_idx={test_idx} {item.config.rootpath}/{item.nodeid}'
4545
cmd += f' > {test_out_file} 2>&1'
46-
cmd += f' ; python -m pytest_parallel.send_report {socket_flags} --_test_idx={test_idx} --_test_name={test_out_file}'
46+
cmd += f' ; python3 -m pytest_parallel.send_report {socket_flags} --_test_idx={test_idx} --_test_name={test_out_file}'
4747
cmd += ')'
4848
cmds.append(cmd)
4949
current_proc += item.n_proc
@@ -74,18 +74,17 @@ def submit_items(items_to_run, SCHEDULER_IP_ADDRESS, port, session_folder, main_
7474

7575
def receive_items(items, session, socket, n_item_to_recv):
7676
# > Precondition: Items must keep their original order to pick up the right item at the reception
77-
original_indices = np.array([item.original_index for item in items])
78-
assert (original_indices==np.arange(len(items))).all()
77+
original_indices = [item.original_index for item in items]
78+
assert original_indices==list(range(len(items)))
7979

8080
while n_item_to_recv>0:
8181
conn, addr = socket.accept()
8282
with conn:
8383
msg = socket_recv(conn)
8484
test_info = pickle.loads(msg) # the worker is supposed to have send a dict with the correct structured information
85-
#print(f"{test_info=}")
8685
if 'signal_info' in test_info:
8786
print('signal_info= ',test_info['signal_info'])
88-
break;
87+
break
8988
else:
9089
test_idx = test_info['test_idx']
9190
if test_info['fatal_error'] is not None:

pytest_parallel/slurm_scheduler.py

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,12 @@
33
import socket
44
import pickle
55
from pathlib import Path
6-
from .utils.socket import recv as socket_recv, setup_socket
6+
from .utils.socket import recv as socket_recv
7+
from .utils.socket import setup_socket
78
from .utils.items import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index, mark_skip
89
from .utils.file import remove_exotic_chars, create_folders
910
from .algo import partition
1011

11-
def parse_job_id_from_submission_output(s):
12-
# At this point, we are trying to guess -_-
13-
# Here we supposed that the command for submitting the job
14-
# returned string with only one number,
15-
# and that this number is the job id
16-
import re
17-
return int(re.search(r'\d+', str(s)).group())
18-
1912
def submit_items(items_to_run, socket, session_folder, main_invoke_params, ntasks, slurm_conf):
2013
SCHEDULER_IP_ADDRESS, port = setup_socket(socket)
2114

@@ -45,8 +38,8 @@ def submit_items(items_to_run, socket, session_folder, main_invoke_params, ntask
4538
srun_options = ''
4639
socket_flags = f"--_scheduler_ip_address={SCHEDULER_IP_ADDRESS} --_scheduler_port={port} --_session_folder={session_folder}"
4740
cmds = ''
48-
if slurm_conf['additional_cmds'] is not None:
49-
cmds += slurm_conf['additional_cmds'] + '\n'
41+
if slurm_conf['init_cmds'] is not None:
42+
cmds += slurm_conf['init_cmds'] + '\n'
5043
for item in items:
5144
test_idx = item.original_index
5245
test_out_file = f'.pytest_parallel/{session_folder}/{remove_exotic_chars(item.nodeid)}'
@@ -58,7 +51,7 @@ def submit_items(items_to_run, socket, session_folder, main_invoke_params, ntask
5851
cmd += ' -l' #
5952
cmd += f' python3 -u -m pytest -s --_worker {socket_flags} {main_invoke_params} --_test_idx={test_idx} {item.config.rootpath}/{item.nodeid}'
6053
cmd += f' > {test_out_file} 2>&1'
61-
cmd += f' ; python -m pytest_parallel.send_report {socket_flags} --_test_idx={test_idx} --_test_name={test_out_file}'
54+
cmd += f' ; python3 -m pytest_parallel.send_report {socket_flags} --_test_idx={test_idx} --_test_name={test_out_file}'
6255
cmd += ')'
6356
cmd += ' &\n' # launch everything in parallel
6457
cmds += cmd
@@ -73,23 +66,17 @@ def submit_items(items_to_run, socket, session_folder, main_invoke_params, ntask
7366
with open(f'.pytest_parallel/{session_folder}/env_vars.sh','wb') as f:
7467
f.write(pytest._pytest_parallel_env_vars)
7568

76-
if slurm_conf['sub_command'] is None:
77-
if slurm_conf['export_env']:
78-
sbatch_cmd = f'sbatch --parsable --export-file=.pytest_parallel/{session_folder}/env_vars.sh .pytest_parallel/{session_folder}/job.sh'
79-
else:
80-
sbatch_cmd = f'sbatch --parsable .pytest_parallel/{session_folder}/job.sh'
69+
if slurm_conf['export_env']:
70+
sbatch_cmd = f'sbatch --parsable --export-file=.pytest_parallel/{session_folder}/env_vars.sh .pytest_parallel/{session_folder}/job.sh'
8171
else:
82-
sbatch_cmd = slurm_conf['sub_command'] + ' .pytest_parallel/{session_folder}/job.sh'
72+
sbatch_cmd = f'sbatch --parsable .pytest_parallel/{session_folder}/job.sh'
8373

8474
p = subprocess.Popen([sbatch_cmd], shell=True, stdout=subprocess.PIPE)
8575
print('\nSubmitting tests to SLURM...')
8676
returncode = p.wait()
8777
assert returncode==0, f'Error when submitting to SLURM with `{sbatch_cmd}`'
8878

89-
if slurm_conf['sub_command'] is None:
90-
slurm_job_id = int(p.stdout.read())
91-
else:
92-
slurm_job_id = parse_job_id_from_submission_output(p.stdout.read())
79+
slurm_job_id = int(p.stdout.read())
9380

9481
print(f'SLURM job {slurm_job_id} has been submitted')
9582
return slurm_job_id

pytest_parallel/utils/file.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ def remove_exotic_chars(s):
1313

1414
def create_folders():
1515
Path('.pytest_parallel').mkdir(exist_ok=True)
16-
session_folder_abs = Path(tempfile.mkdtemp(dir='.pytest_parallel'))
16+
session_folder_abs = Path(tempfile.mkdtemp(dir='.pytest_parallel')) # create a folder that did not already exist
1717
Path(session_folder_abs/'_partial').mkdir()
1818
return session_folder_abs.name

0 commit comments

Comments
 (0)