diff --git a/crates/accelerate/src/lib.rs b/crates/accelerate/src/lib.rs index 1a99af4ba1fc..1076a4190df9 100644 --- a/crates/accelerate/src/lib.rs +++ b/crates/accelerate/src/lib.rs @@ -21,6 +21,43 @@ pub mod results; pub mod sampled_exp_val; pub mod twirling; pub mod uc_gate; +pub mod unitary_synthesis; +pub mod utils; +pub mod vf2_layout; +mod rayon_ext; +#[cfg(test)] +mod test; +mod unitary_compose; + +#[inline] +pub fn getenv_use_multiple_threads() -> bool { + let parallel_context = env::var("QISKIT_IN_PARALLEL") + .unwrap_or_else(|_| "FALSE".to_string()) + .to_uppercase() + == "TRUE"; + let force_threads = env::var("QISKIT_FORCE_THREADS") + .unwrap_or_else(|_| "FALSE".to_string()) + .to_uppercase() + == "TRUE"; + + let result = !parallel_context || force_threads; + + // Log threading decision if debug logging is enabled + if env::var("QISKIT_DEBUG_THREADING") + .unwrap_or_else(|_| "FALSE".to_string()) + .to_uppercase() + == "TRUE" + { + eprintln!( + "Rust threading decision: {} (parallel_context={}, force_threads={})", + if result { "MULTI_THREADED" } else { "SINGLE_THREADED" }, + parallel_context, + force_threads + ); + } + + result +} import_exception!(qiskit.exceptions, QiskitError); import_exception!(qiskit.circuit.exceptions, CircuitError); diff --git a/qiskit/passmanager/passmanager.py b/qiskit/passmanager/passmanager.py index ae62bae1b2c6..d1463225de95 100644 --- a/qiskit/passmanager/passmanager.py +++ b/qiskit/passmanager/passmanager.py @@ -21,7 +21,7 @@ import dill -from qiskit.utils.parallel import parallel_map, should_run_in_parallel +from qiskit.utils.parallel import parallel_map, should_run_in_parallel, CPU_COUNT from .base_tasks import Task, PassManagerIR from .exceptions import PassManagerError from .flow_controllers import FlowControllerLinear @@ -233,7 +233,12 @@ def callback_func(**kwargs): # If we're not going to run in parallel, we want to avoid spending time `dill` serializing # ourselves, since that can be quite expensive. - if len(in_programs) == 1 or not should_run_in_parallel(num_processes): + use_parallel = should_run_in_parallel(num_processes) + if len(in_programs) == 1 or not use_parallel: + if len(in_programs) == 1: + logger.debug("PassManager running single program serially") + else: + logger.debug("PassManager running %d programs serially (parallel disabled)", len(in_programs)) out = [ _run_workflow( program=program, @@ -251,6 +256,9 @@ def callback_func(**kwargs): del callback del kwargs + logger.debug("PassManager running %d programs in parallel with %d processes", + len(in_programs), num_processes or CPU_COUNT) + # Pass manager may contain callable and we need to serialize through dill rather than pickle. # See https://github.com/Qiskit/qiskit-terra/pull/3290 # Note that serialized object is deserialized as a different object. diff --git a/qiskit/transpiler/preset_passmanagers/builtin_plugins.py b/qiskit/transpiler/preset_passmanagers/builtin_plugins.py index 66f7d7ff9424..91c09398956f 100644 --- a/qiskit/transpiler/preset_passmanagers/builtin_plugins.py +++ b/qiskit/transpiler/preset_passmanagers/builtin_plugins.py @@ -12,8 +12,11 @@ """Built-in transpiler stage plugins for preset pass managers.""" +import logging import os +logger = logging.getLogger(__name__) + from qiskit.transpiler.passes.layout.vf2_post_layout import VF2PostLayout from qiskit.transpiler.passes.optimization.split_2q_unitaries import Split2QUnitaries from qiskit.transpiler.passmanager import PassManager @@ -1039,9 +1042,16 @@ def _swap_mapped(property_set): def _get_trial_count(default_trials=5): - if CONFIG.get("sabre_all_threads", None) or os.getenv("QISKIT_SABRE_ALL_THREADS"): - return max(default_num_processes(), default_trials) - return default_trials + use_all_threads = CONFIG.get("sabre_all_threads", None) or os.getenv("QISKIT_SABRE_ALL_THREADS") + if use_all_threads: + trial_count = max(CPU_COUNT, default_trials) + logger.debug("SABRE using all threads: %d trials (CPU_COUNT=%d, default=%d)", + trial_count, CPU_COUNT, default_trials) + return trial_count + else: + logger.debug("SABRE using default thread configuration: %d trials", default_trials) + return default_trials + class CliffordTOptimizationPassManager(PassManagerStagePlugin): diff --git a/qiskit/utils/parallel.py b/qiskit/utils/parallel.py index 79d40d141215..731a639f515c 100644 --- a/qiskit/utils/parallel.py +++ b/qiskit/utils/parallel.py @@ -54,9 +54,11 @@ from __future__ import annotations +import logging import contextlib import functools import multiprocessing + import os import platform import sys @@ -65,6 +67,8 @@ from qiskit import user_config +logger = logging.getLogger(__name__) + CONFIG = user_config.get_config() @@ -186,8 +190,28 @@ def should_run_in_parallel(num_processes: int | None = None) -> bool: .. autofunction:: qiskit.utils::should_run_in_parallel.ignore_user_settings Args: - num_processes: the maximum number of processes requested for use (``None`` implies the - default). + num_processes: the number of processes requested for use (if given). + """ + num_processes = CPU_COUNT if num_processes is None else num_processes + in_parallel = os.getenv("QISKIT_IN_PARALLEL", "FALSE") == "TRUE" + parallel_enabled = CONFIG.get("parallel_enabled", PARALLEL_DEFAULT) + + result = ( + num_processes > 1 + and not in_parallel + and parallel_enabled + ) + + logger.debug( + "Parallelization decision: %s (num_processes=%d, in_parallel=%s, parallel_enabled=%s)", + "PARALLEL" if result else "SERIAL", + num_processes, + in_parallel, + parallel_enabled + ) + + return result + Examples: Temporarily override the configured settings to disable parallelism:: @@ -303,16 +327,40 @@ def func(_): """ task_kwargs = {} if task_kwargs is None else task_kwargs if num_processes is None: - num_processes = default_num_processes() - if len(values) < 2 or not should_run_in_parallel(num_processes): - return [task(value, *task_args, **task_kwargs) for value in values] - work_items = ((task, value, task_args, task_kwargs) for value in values) - - # This isn't a user-set variable; we set this to talk to our own child processes. - previous_in_parallel = os.getenv("QISKIT_IN_PARALLEL", _IN_PARALLEL_ALLOW_PARALLELISM) - os.environ["QISKIT_IN_PARALLEL"] = _IN_PARALLEL_FORBID_PARALLELISM - try: - with ProcessPoolExecutor(max_workers=num_processes) as executor: - return list(executor.map(_task_wrapper, work_items)) - finally: - os.environ["QISKIT_IN_PARALLEL"] = previous_in_parallel + num_processes = CPU_COUNT + if len(values) == 0: + return [] + if len(values) == 1: + return [task(values[0], *task_args, **task_kwargs)] + + if should_run_in_parallel(num_processes): + logger.debug("Executing parallel_map with %d processes for %d items", num_processes, len(values)) + os.environ["QISKIT_IN_PARALLEL"] = "TRUE" + try: + results = [] + with ProcessPoolExecutor(max_workers=num_processes) as executor: + param = ((task, value, task_args, task_kwargs) for value in values) + future = executor.map(_task_wrapper, param) + + results = list(future) + logger.debug("Parallel execution completed successfully") + + except (KeyboardInterrupt, Exception) as error: + logger.debug("Parallel execution failed: %s", str(error)) + if isinstance(error, KeyboardInterrupt): + os.environ["QISKIT_IN_PARALLEL"] = "FALSE" + raise QiskitError("Keyboard interrupt in parallel_map.") from error + # Otherwise just reset parallel flag and error + os.environ["QISKIT_IN_PARALLEL"] = "FALSE" + raise error + + os.environ["QISKIT_IN_PARALLEL"] = "FALSE" + return results + + logger.debug("Executing parallel_map serially for %d items", len(values)) + results = [] + for _, value in enumerate(values): + result = task(value, *task_args, **task_kwargs) + results.append(result) + return results +