Skip to content

Commit 52f3ead

Browse files
fix import error not reported correctly in worker (for process-isolate schedulers)
1 parent ca026ea commit 52f3ead

File tree

2 files changed

+137
-27
lines changed

2 files changed

+137
-27
lines changed

pytest_parallel/process_worker.py

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,49 @@ def __init__(self, scheduler_ip_address, scheduler_port, session_folder, test_id
1919
def _file_path(self, when):
2020
return Path(f'.pytest_parallel/{self.session_folder}/_partial/{self.test_idx}_{when}')
2121

22+
@pytest.hookimpl(tryfirst=True)
23+
def pytest_make_collect_report(self, collector):
24+
comm = MPI.COMM_WORLD
25+
26+
# Here, we are just before the call to the `pytest_make_collect_report` function of pytest
27+
# The pytest `pytest_make_collect_report` function is the one that imports the test,
28+
# So it is possible that it crashes (e.g. forced exit in a module imported by the test).
29+
# Hence, before calling it, we want to register the fact that we at least reached this point
30+
if comm.rank == 0:
31+
file_path = self._file_path('before_import')
32+
with open(file_path, 'w', encoding='utf-8') as f:
33+
f.write('before import')
34+
35+
@pytest.hookimpl(tryfirst=True)
36+
def pytest_collectreport(self, report):
37+
comm = MPI.COMM_WORLD
38+
39+
if comm.rank == 0:
40+
collect_file = self._file_path('collect')
41+
42+
# For an unknown reason, `pytest_collectreport` is called several times
43+
# However, we only use one 'collect' file
44+
# So we need to create the file if it is has not been created yet
45+
# If the file already exists, then we overwrite it if it previously passed and we now fail
46+
if not collect_file.exists():
47+
do_report = True
48+
else:
49+
with open(collect_file, 'rb') as file:
50+
previous_report_info = file.read()
51+
previous_report_info = pickle.loads(previous_report_info)
52+
if previous_report_info['outcome'] == 'passed' and report.outcome == 'failed':
53+
do_report = True
54+
else:
55+
do_report = False
56+
57+
if do_report:
58+
report_info = {'outcome' : report.outcome,
59+
'longrepr': report.longrepr,
60+
'duration': 0., }
61+
with open(collect_file, 'wb') as f:
62+
f.write(pickle.dumps(report_info))
63+
64+
2265
@pytest.hookimpl(tryfirst=True)
2366
def pytest_runtestloop(self, session) -> bool:
2467
comm = MPI.COMM_WORLD
@@ -27,12 +70,12 @@ def pytest_runtestloop(self, session) -> bool:
2770
test_comm_size = get_n_proc_for_test(item)
2871

2972
item.sub_comm = comm
30-
item.test_info = {'test_idx': self.test_idx, 'fatal_error': None}
73+
item.test_info = {'test_idx': self.test_idx, 'fatal_error': None} # TODO 2025-07 not used, remove
3174

3275

33-
# check there is no file from a previous run
76+
# check there is no file from a previous run # TODO move this check up the pytest workflow, and complete it with other files
3477
if comm.rank == 0:
35-
for when in ['fatal_error', 'setup', 'call', 'teardown']:
78+
for when in ['pre_run_error', 'setup', 'call', 'teardown']:
3679
path = self._file_path(when)
3780
assert not path.exists(), f'INTERNAL FATAL ERROR in pytest_parallel: file "{path}" should not exist at this point'
3881

@@ -42,7 +85,7 @@ def pytest_runtestloop(self, session) -> bool:
4285
error_info = f'FATAL ERROR in pytest_parallel with slurm scheduling: test `{item.nodeid}`' \
4386
f' uses a `comm` of size {test_comm_size} but was launched with size {comm.size}.\n' \
4487
f' This generally indicates that `srun` does not interoperate correctly with MPI.'
45-
file_path = self._file_path('fatal_error')
88+
file_path = self._file_path('pre_run_error')
4689
with open(file_path, 'w', encoding='utf-8') as f:
4790
f.write(error_info)
4891
return True
@@ -51,6 +94,7 @@ def pytest_runtestloop(self, session) -> bool:
5194
nextitem = None
5295
run_item_test(item, nextitem, session)
5396

97+
# TODO 2025-07 not used, remove
5498
if item.test_info['fatal_error'] is not None:
5599
assert 0, f'{item.test_info["fatal_error"]}'
56100

@@ -61,12 +105,12 @@ def pytest_runtest_makereport(self, item):
61105
"""
62106
We need to hook to pass the test sub-comm to `pytest_runtest_logreport`,
63107
and for that we add the sub-comm to the only argument of `pytest_runtest_logreport`, that is, `report`
64-
We also need to pass `item.test_info` so that we can update it
108+
We also need to pass `item.test_info` so that we can update it # TODO 2025-07 not used, remove
65109
"""
66110
result = yield
67111
report = result.get_result()
68112
report.sub_comm = item.sub_comm
69-
report.test_info = item.test_info
113+
report.test_info = item.test_info # TODO 2025-07 not used, remove
70114

71115
@pytest.hookimpl(tryfirst=True)
72116
def pytest_runtest_logreport(self, report):

pytest_parallel/send_report.py

Lines changed: 87 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,48 +23,114 @@
2323
def _file_path(when):
2424
return Path(f'.pytest_parallel/{args._session_folder}/_partial/{args._test_idx}_{when}')
2525

26-
test_info = {'test_idx': args._test_idx, 'fatal_error': None} # TODO no fatal_error=None (absense means no error)
26+
def _longrepr_from_str(msg):
27+
trace_back = ReprTraceback([ReprEntryNative(msg)], None, None)
28+
collect_longrepr = []
29+
collect_longrepr.append(
30+
(trace_back, None, None)
31+
)
32+
return ExceptionChainRepr(collect_longrepr)
2733

28-
# 'fatal_error' file
29-
file_path = _file_path('fatal_error')
30-
if file_path.exists():
31-
with open(file_path, 'r', encoding='utf-8') as file:
32-
fatal_error = file.read()
33-
test_info['fatal_error'] = fatal_error
3434

35+
def _fill_test_info_from_report(test_info, when):
36+
assert when in ['setup', 'call', 'teardown']
3537

36-
# 'setup/call/teardown' files
37-
already_failed = False
38-
for when in ('setup', 'call', 'teardown'):
3938
file_path = _file_path(when)
4039
if file_path.exists():
4140
try:
4241
with open(file_path, 'rb') as file:
4342
report_info = file.read()
4443
report_info = pickle.loads(report_info)
4544
test_info[when] = report_info
45+
failed = report_info['outcome'] == 'failed'
4646
except pickle.PickleError:
4747
test_info['fatal_error'] = f'FATAL ERROR in pytest_parallel : unable to decode {file_path}'
48+
failed = True
4849
else: # Supposedly not found because the test crashed before writing the file
49-
collect_longrepr = []
50-
msg = 'Error: the test crashed. '
50+
msg = f'Error: the test crashed during `{when}` phase. '
5151
red = 31
5252
bold = 1
5353
msg = f'\x1b[{red}m' + f'\x1b[{bold}m' + msg+ '\x1b[0m'
5454
msg += f'Log file: {args._test_name}\n'
55-
trace_back = ReprTraceback([ReprEntryNative(msg)], None, None)
56-
collect_longrepr.append(
57-
(trace_back, None, None)
58-
)
59-
longrepr = ExceptionChainRepr(collect_longrepr)
60-
61-
outcome = 'passed' if already_failed else 'failed' # No need to report the error twice
62-
test_info[when] = {'outcome' : outcome,
55+
longrepr = _longrepr_from_str(msg)
56+
57+
test_info[when] = {'outcome' : 'failed',
6358
'longrepr': longrepr,
6459
'duration': 0, } # unable to report accurately
6560

66-
already_failed = True
61+
failed = True
62+
return failed
63+
64+
def _retrieve_test_info():
65+
test_info = {'test_idx': args._test_idx, 'fatal_error': None} # TODO no fatal_error=None (absense means no error)
66+
for when in ('setup', 'call', 'teardown'):
67+
test_info[when] = {'outcome' : 'passed',
68+
'longrepr': _longrepr_from_str(''),
69+
'duration': 0, }
70+
71+
# During test execution, the following files are created in order:
72+
# 1. before_import
73+
# 2. collect
74+
# 3. pre_run_error
75+
# 4. setup
76+
# 5. call
77+
# 6. teardown
78+
# if one of the file is missing, it means there was a crash (except for `pre_run_error`, where it is the other way around)
79+
80+
# 1. if `before_import` is not present, we crashed at the very begining
81+
if not _file_path('before_import').exists():
82+
test_info['fatal_error'] = 'FATAL ERROR in pytest_parallel early processing\n'
83+
test_info['fatal_error'] += f'Log file: {args._test_name}\n'
84+
return test_info
85+
86+
# 2. handle collection
87+
if not _file_path('collect').exists(): # if `collect` is not present, we crashed during the test collection
88+
test_info['fatal_error'] = 'FATAL ERROR in pytest_parallel during test collection\n'
89+
test_info['fatal_error'] += f'Log file: {args._test_name}\n'
90+
return test_info
91+
else: # else we report if the collection failed
92+
with open(_file_path('collect'), 'rb') as file:
93+
report_info = file.read()
94+
report_info = pickle.loads(report_info)
95+
if report_info['outcome'] == 'failed':
96+
# Note:
97+
# We could send report_info['longrepr'] to master so that it reports it directly
98+
# However, it would be confusing, because master also did the collection phase with no error
99+
# (if there were an error, the worker would not run in the first place)
100+
# To make it clear that the error appears on the worker only, better refer to the report of the worker
101+
msg = f'Error: the test crashed during `collect` phase. '
102+
red = 31
103+
bold = 1
104+
msg = f'\x1b[{red}m' + f'\x1b[{bold}m' + msg+ '\x1b[0m'
105+
msg += f'Log file: {args._test_name}\n'
106+
longrepr = _longrepr_from_str(msg)
107+
108+
# report as a setup failure (because indeed, the worker failed to setup the test by failing to collect it)
109+
test_info['setup'] = {'outcome' : 'failed',
110+
'longrepr': longrepr,
111+
'duration': 0, } # unable to report accurately
112+
return test_info
113+
114+
# 3. if `pre_run_error` is present, there was a fatal error in the pytest_parallel test handling
115+
file_path = _file_path('pre_run_error')
116+
if file_path.exists():
117+
with open(file_path, 'r', encoding='utf-8') as file:
118+
pre_run_error_msg = file.read()
119+
test_info['fatal_error'] = pre_run_error_msg
120+
return test_info
121+
122+
# 4.,5.,6.: 'setup/call/teardown' files
123+
for when in ('setup', 'call', 'teardown'):
124+
failed = _fill_test_info_from_report(test_info, when)
125+
if failed:
126+
return test_info
127+
128+
return test_info
129+
130+
131+
67132

133+
test_info = _retrieve_test_info()
68134

69135
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
70136
s.connect((args._scheduler_ip_address, args._scheduler_port))

0 commit comments

Comments
 (0)