Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions stestr/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os.path
import subprocess
import sys
import warnings

from cliff import command
import subunit
Expand Down Expand Up @@ -245,6 +246,12 @@
help="If set, show non-text attachments. This is "
"generally only useful for debug purposes.",
)
parser.add_argument(
"--dynamic",
action="store_true",
default=False,
help="Enable the EXPERIMENTAL dynamic scheduler",
)
return parser

def take_action(self, parsed_args):
Expand Down Expand Up @@ -336,6 +343,7 @@
all_attachments=all_attachments,
show_binary_attachments=args.show_binary_attachments,
pdb=args.pdb,
dynamic=args.dynamic,
)

# Always output slowest test info if requested, regardless of other
Expand Down Expand Up @@ -397,6 +405,7 @@
all_attachments=False,
show_binary_attachments=True,
pdb=False,
dynamic=False,
):
"""Function to execute the run command

Expand Down Expand Up @@ -461,6 +470,7 @@
:param str pdb: Takes in a single test_id to bypasses test
discover and just execute the test specified without launching any
additional processes. A file name may be used in place of a test name.
:param bool dynamic: Enable dynamic scheduling

:return return_code: The exit code for the command. 0 for success and > 0
for failures.
Expand Down Expand Up @@ -502,6 +512,11 @@
)
stdout.write(msg)
return 2
if dynamic:
warnings.warn(

Check warning on line 516 in stestr/commands/run.py

View check run for this annotation

Codecov / codecov/patch

stestr/commands/run.py#L516

Added line #L516 was not covered by tests
"WARNING: The dynamic scheduler is still experimental. "
"You might encounter issues while using it"
)
if combine:
latest_id = repo.latest_id()
combine_id = str(latest_id)
Expand Down Expand Up @@ -547,7 +562,8 @@
(
"subunit",
output.ReturnCodeToSubunit(
subprocess.Popen(run_cmd, shell=True, stdout=subprocess.PIPE)
subprocess.Popen(run_cmd, shell=True, stdout=subprocess.PIPE),
dynamic=False,
),
)
]
Expand Down Expand Up @@ -654,6 +670,7 @@
top_dir=top_dir,
test_path=test_path,
randomize=random,
dynamic=dynamic,
)
if isolated:
result = 0
Expand Down Expand Up @@ -693,6 +710,7 @@
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments,
dynamic=dynamic,
)
if run_result > result:
result = run_result
Expand All @@ -711,6 +729,7 @@
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments,
dynamic=dynamic,
)
else:
# Where do we source data about the cause of conflicts.
Expand Down Expand Up @@ -780,16 +799,28 @@
suppress_attachments=False,
all_attachments=False,
show_binary_attachments=False,
dynamic=False,
):
"""Run the tests cmd was parameterised with."""
cmd.setUp()
try:

def run_tests():
run_procs = [
("subunit", output.ReturnCodeToSubunit(proc))
for proc in cmd.run_tests()
]
if not dynamic or cmd.concurrency == 1:
run_procs = [
("subunit", output.ReturnCodeToSubunit(proc, dynamic=False))
for proc in cmd.run_tests()
]
else:
run_procs = [

Check warning on line 815 in stestr/commands/run.py

View check run for this annotation

Codecov / codecov/patch

stestr/commands/run.py#L815

Added line #L815 was not covered by tests
(
"subunit",
output.ReturnCodeToSubunit(
os.fdopen(proc["stream"]), proc["proc"]
),
)
for proc in cmd.run_tests()
]
if not run_procs:
stdout.write("The specified regex doesn't match with anything")
return 1
Expand Down
3 changes: 3 additions & 0 deletions stestr/config_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def get_run_command(
exclude_regex=None,
randomize=False,
parallel_class=None,
dynamic=False,
):
"""Get a test_processor.TestProcessorFixture for this config file

Expand Down Expand Up @@ -158,6 +159,7 @@ def get_run_command(
stestr scheduler by class. If both this and the corresponding
config file option which includes `group-regex` are set, this value
will be used.
:param bool dynamic: Enable dynamic scheduling

:returns: a TestProcessorFixture object for the specified config file
and any arguments passed into this function
Expand Down Expand Up @@ -236,4 +238,5 @@ def group_callback(test_id, regex=re.compile(group_regex)):
exclude_regex=exclude_regex,
include_list=include_list,
randomize=randomize,
dynamic=dynamic,
)
20 changes: 16 additions & 4 deletions stestr/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,33 @@
generating subunit.
"""

def __init__(self, process):
def __init__(self, process, thread=None, dynamic=True):
"""Adapt a process to a readable stream."""
self.proc = process
self.done = False
self.source = self.proc.stdout
if dynamic:
self.source = process
self.proc = thread

Check warning on line 173 in stestr/output.py

View check run for this annotation

Codecov / codecov/patch

stestr/output.py#L172-L173

Added lines #L172 - L173 were not covered by tests
else:
self.source = self.proc.stdout
self.dynamic = dynamic
self.lastoutput = bytes((b"\n")[0])

def __del__(self):
self.proc.wait()
if hasattr(self.proc, "wait"):
self.proc.wait()
else:
self.proc.join()

Check warning on line 183 in stestr/output.py

View check run for this annotation

Codecov / codecov/patch

stestr/output.py#L183

Added line #L183 was not covered by tests

def _append_return_code_as_test(self):
if self.done is True:
return
self.source = io.BytesIO()
returncode = self.proc.wait()
if not self.dynamic:
returncode = self.proc.wait()

Check warning on line 190 in stestr/output.py

View check run for this annotation

Codecov / codecov/patch

stestr/output.py#L190

Added line #L190 was not covered by tests
else:
self.proc.join()
returncode = self.proc.exitcode

Check warning on line 193 in stestr/output.py

View check run for this annotation

Codecov / codecov/patch

stestr/output.py#L192-L193

Added lines #L192 - L193 were not covered by tests
if returncode != 0:
if self.lastoutput != bytes((b"\n")[0]):
# Subunit V1 is line orientated, it has to start on a fresh
Expand Down
68 changes: 68 additions & 0 deletions stestr/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,74 @@
from stestr import selection


def get_dynamic_test_list(
test_ids, repository=None, group_callback=None, randomize=False
):
dynamic_test_list = []
_group_callback = group_callback
time_data = {}

Check warning on line 29 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L27-L29

Added lines #L27 - L29 were not covered by tests
if randomize:
return random.shuffle(test_ids)

Check warning on line 31 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L31

Added line #L31 was not covered by tests
if repository:
time_data = repository.get_test_times(test_ids)
timed_tests = time_data["known"]
unknown_tests = time_data["unknown"]

Check warning on line 35 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L33-L35

Added lines #L33 - L35 were not covered by tests
else:
timed_tests = {}
unknown_tests = set(test_ids)

Check warning on line 38 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L37-L38

Added lines #L37 - L38 were not covered by tests
# Group tests: generate group_id -> test_ids.
group_ids = collections.defaultdict(list)

Check warning on line 40 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L40

Added line #L40 was not covered by tests
if _group_callback is None:

def group_callback(_):
return None

Check warning on line 44 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L43-L44

Added lines #L43 - L44 were not covered by tests

else:
group_callback = _group_callback

Check warning on line 47 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L47

Added line #L47 was not covered by tests
for test_id in test_ids:
group_id = group_callback(test_id) or test_id
group_ids[group_id].append(test_id)

Check warning on line 50 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L49-L50

Added lines #L49 - L50 were not covered by tests
# Time groups: generate three sets of groups:
# - fully timed dict(group_id -> time),
# - partially timed dict(group_id -> time) and
# - unknown (set of group_id)
# We may in future treat partially timed different for scheduling, but
# at least today we just schedule them after the fully timed groups.
timed = {}
partial = {}
unknown = []

Check warning on line 59 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L57-L59

Added lines #L57 - L59 were not covered by tests
for group_id, group_tests in group_ids.items():
untimed_ids = unknown_tests.intersection(group_tests)
group_time = sum(

Check warning on line 62 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L61-L62

Added lines #L61 - L62 were not covered by tests
[
timed_tests[test_id]
for test_id in untimed_ids.symmetric_difference(group_tests)
]
)
if not untimed_ids:
timed[group_id] = group_time

Check warning on line 69 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L69

Added line #L69 was not covered by tests
elif group_time:
partial[group_id] = group_time

Check warning on line 71 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L71

Added line #L71 was not covered by tests
else:
unknown.append(group_id)

Check warning on line 73 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L73

Added line #L73 was not covered by tests

# Scheduling is NP complete in general, so we avoid aiming for
# perfection. A quick approximation that is sufficient for our general
# needs:
# sort the groups by time
# allocate to partitions by putting each group in to the partition with
# the current (lowest time, shortest length[in tests])
def consume_queue(groups):
queue = sorted(groups.items(), key=operator.itemgetter(1), reverse=True)
dynamic_test_list.extend([group[0] for group in queue])

Check warning on line 83 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L81-L83

Added lines #L81 - L83 were not covered by tests

consume_queue(timed)
consume_queue(partial)
dynamic_test_list.extend(unknown)

Check warning on line 87 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L85-L87

Added lines #L85 - L87 were not covered by tests

return dynamic_test_list

Check warning on line 89 in stestr/scheduler.py

View check run for this annotation

Codecov / codecov/patch

stestr/scheduler.py#L89

Added line #L89 was not covered by tests


def partition_tests(test_ids, concurrency, repository, group_callback, randomize=False):
"""Partition test_ids by concurrency.

Expand Down
81 changes: 67 additions & 14 deletions stestr/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.

import functools
import io
import multiprocessing
import os
import re
import signal
Expand All @@ -24,6 +26,8 @@
from stestr import results
from stestr import scheduler
from stestr import selection
from stestr.subunit_runner import program
from stestr.subunit_runner import run
from stestr import testlist


Expand Down Expand Up @@ -94,6 +98,7 @@
exclude_regex=None,
include_list=None,
randomize=False,
dynamic=False,
):
"""Create a TestProcessorFixture."""

Expand All @@ -115,6 +120,7 @@
self.include_list = include_list
self.exclude_regex = exclude_regex
self.randomize = randomize
self.dynamic = dynamic

def setUp(self):
super().setUp()
Expand Down Expand Up @@ -249,6 +255,31 @@
ids = testlist.parse_enumeration(out)
return ids

def _dynamic_run_tests(self, job_queue, subunit_pipe):
while True:

Check warning on line 259 in stestr/test_processor.py

View check run for this annotation

Codecov / codecov/patch

stestr/test_processor.py#L259

Added line #L259 was not covered by tests
# NOTE(mtreinish): Open on each loop iteration with a dup to
# remove the chance of being garbage collected. Without this
# you'll be fighting random Bad file desciptor errors
subunit_pipe = os.fdopen(os.dup(subunit_pipe.fileno()), "wb")

Check warning on line 263 in stestr/test_processor.py

View check run for this annotation

Codecov / codecov/patch

stestr/test_processor.py#L263

Added line #L263 was not covered by tests
if job_queue.empty():
subunit_pipe.close()
return
try:
test_id = job_queue.get(block=False)
except Exception:
subunit_pipe.close()
return

Check warning on line 271 in stestr/test_processor.py

View check run for this annotation

Codecov / codecov/patch

stestr/test_processor.py#L265-L271

Added lines #L265 - L271 were not covered by tests
if not test_id:
os.close(subunit_pipe.fileno())
raise ValueError("Invalid blank test_id: %s" % test_id)
cmd_list = [self.cmd, test_id]
test_runner = run.SubunitTestRunner
program.TestProgram(

Check warning on line 277 in stestr/test_processor.py

View check run for this annotation

Codecov / codecov/patch

stestr/test_processor.py#L273-L277

Added lines #L273 - L277 were not covered by tests
module=None,
argv=cmd_list,
testRunner=functools.partial(test_runner, stdout=subunit_pipe),
)

def run_tests(self):
"""Run the tests defined by the command

Expand Down Expand Up @@ -280,19 +311,41 @@
test_id_groups = scheduler.partition_tests(
test_ids, self.concurrency, self.repository, self._group_callback
)
for test_ids in test_id_groups:
if not test_ids:
# No tests in this partition
continue
fixture = self.useFixture(
TestProcessorFixture(
test_ids,
self.template,
self.listopt,
self.idoption,
self.repository,
parallel=False,
if not self.dynamic:
for test_ids in test_id_groups:
if not test_ids:
# No tests in this partition
continue
fixture = self.useFixture(
TestProcessorFixture(
test_ids,
self.template,
self.listopt,
self.idoption,
self.repository,
parallel=False,
)
)
result.extend(fixture.run_tests())
return result
else:
test_id_list = scheduler.get_dynamic_test_list(

Check warning on line 332 in stestr/test_processor.py

View check run for this annotation

Codecov / codecov/patch

stestr/test_processor.py#L332

Added line #L332 was not covered by tests
test_ids, self.repository, self._group_callback
)
result.extend(fixture.run_tests())
return result
test_list = multiprocessing.Queue()

Check warning on line 335 in stestr/test_processor.py

View check run for this annotation

Codecov / codecov/patch

stestr/test_processor.py#L335

Added line #L335 was not covered by tests

for test_id in test_id_list:
test_list.put(test_id)

Check warning on line 338 in stestr/test_processor.py

View check run for this annotation

Codecov / codecov/patch

stestr/test_processor.py#L338

Added line #L338 was not covered by tests

for i in range(self.concurrency):
fd_pipe_r, fd_pipe_w = multiprocessing.Pipe(False)
name = "worker-%s" % i
proc = multiprocessing.Process(

Check warning on line 343 in stestr/test_processor.py

View check run for this annotation

Codecov / codecov/patch

stestr/test_processor.py#L341-L343

Added lines #L341 - L343 were not covered by tests
target=self._dynamic_run_tests,
name=name,
args=(test_list, fd_pipe_w),
)
proc.start()
stream_read = os.dup(fd_pipe_r.fileno())
result.append({"stream": stream_read, "proc": proc})
return result

Check warning on line 351 in stestr/test_processor.py

View check run for this annotation

Codecov / codecov/patch

stestr/test_processor.py#L348-L351

Added lines #L348 - L351 were not covered by tests
1 change: 1 addition & 0 deletions stestr/tests/test_config_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def _check_get_run_command(
exclude_regex=None,
exclude_list=None,
concurrency=0,
dynamic=False,
group_callback=expected_group_callback,
test_filters=None,
randomize=False,
Expand Down
1 change: 0 additions & 1 deletion stestr/tests/test_return_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ def assertRunExit(self, cmd, expected, subunit=False, stdin=None):
"%s" % cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = p.communicate()

if not subunit:
self.assertEqual(
p.returncode,
Expand Down
Loading
Loading