diff --git a/MCintegration/mc_multicpu_test.py b/MCintegration/mc_multicpu_test.py index 37dd491..1be7b9f 100644 --- a/MCintegration/mc_multicpu_test.py +++ b/MCintegration/mc_multicpu_test.py @@ -2,56 +2,100 @@ import torch.distributed as dist import torch.multiprocessing as mp import os +import traceback from integrators import MonteCarlo, MarkovChainMonteCarlo +# Set environment variables before spawning processes +os.environ["MASTER_ADDR"] = os.getenv("MASTER_ADDR", "localhost") +os.environ["MASTER_PORT"] = os.getenv("MASTER_PORT", "12355") -def init_process(rank, world_size, fn, backend="gloo"): - # Set MASTER_ADDR and MASTER_PORT appropriately - # Assuming environment variables are set by the cluster's job scheduler - master_addr = os.getenv("MASTER_ADDR", "localhost") - master_port = os.getenv("MASTER_PORT", "12355") +backend = "gloo" - os.environ["MASTER_ADDR"] = master_addr - os.environ["MASTER_PORT"] = master_port +def init_process(rank, world_size, fn, backend=backend): + # try: + # Initialize the process group dist.init_process_group(backend, rank=rank, world_size=world_size) + # Call the function fn(rank, world_size) + # except Exception as e: + # print(f"Error in process {rank}: {e}") + # traceback.print_exc() + # # Make sure to clean up + # if dist.is_initialized(): + # dist.destroy_process_group() + # # Return non-zero to indicate error + # raise e def run_mcmc(rank, world_size): - # Instantiate the MarkovChainMonteCarlo class - bounds = [(-1, 1), (-1, 1)] - n_eval = 8000000 - batch_size = 10000 - n_therm = 10 - - # Define the function to be integrated (dummy example) - def two_integrands(x, f): - f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double() - f[:, 1] = -torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) - return f.mean(dim=-1) - - # device = torch.device(f"cuda:{rank}" if torch.cuda.is_available() else "cpu") - device = torch.device("cpu") - mcmc = MarkovChainMonteCarlo( - bounds, - two_integrands, - 2, - batch_size=batch_size, - nburnin=n_therm, - device=device, - ) + print(world_size) + try: + # Set seed for reproducibility but different for each process + torch.manual_seed(42 + rank) + + # Instantiate the MarkovChainMonteCarlo class + bounds = [(-1, 1), (-1, 1)] + # n_eval = 8000000 // world_size # Divide evaluations among processes + n_eval = 8000000 + batch_size = 10000 + n_therm = 20 + + # Define the function to be integrated (dummy example) + def two_integrands(x, f): + f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double() + f[:, 1] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2 + return f.mean(dim=-1) + + # Choose device based on availability and rank + if torch.cuda.is_available() and torch.cuda.device_count() > world_size: + device = torch.device(f"cuda:{rank % torch.cuda.device_count()}") + else: + device = torch.device("cpu") + + print(f"Process {rank} using device: {device}") + + mcmc = MarkovChainMonteCarlo( + bounds=bounds, + f=two_integrands, + f_dim=2, + batch_size=batch_size, + nburnin=n_therm, + device=device, + ) + + # Call the MarkovChainMonteCarlo method + mcmc_result = mcmc(n_eval) + + if rank == 0: + print("MarkovChainMonteCarlo Result:", mcmc_result) + + # except Exception as e: + # print(f"Error in run_mcmc for rank {rank}: {e}") + # traceback.print_exc() + # raise e + finally: + # Clean up + if dist.is_initialized(): + dist.destroy_process_group() + + +def test_mcmc(world_size=2): + # Use fewer processes than CPU cores to avoid resource contention + world_size = min(world_size, mp.cpu_count()) + print(f"Starting with {world_size} processes") - # Call the MarkovChainMonteCarlo method - mcmc_result = mcmc( - neval=n_eval, - f_dim=2, + # Start processes with proper error handling + mp.spawn( + init_process, + args=(world_size, run_mcmc), + nprocs=world_size, + join=True, + daemon=False, ) - if rank == 0: - # Only rank 0 prints the result - print("MarkovChainMonteCarlo Result:", mcmc_result) -def test_mcmc(): - world_size = 8 # Number of processes to launch - mp.spawn(init_process, args=(world_size, run_mcmc), nprocs=world_size, join=True) +# if __name__ == "__main__": +# # Prevent issues with multiprocessing on some platforms +# mp.set_start_method("spawn", force=True) +# test_mcmc(2) diff --git a/examples/example_1.py b/examples/example_1.py index 7d167c4..97f2546 100644 --- a/examples/example_1.py +++ b/examples/example_1.py @@ -14,105 +14,127 @@ # Both integrands are defined over the square [-1,1]×[-1,1] import torch -from MCintegration import MonteCarlo, MarkovChainMonteCarlo, Vegas, set_seed, get_device - -# Set random seed for reproducibility and get computation device -set_seed(42) -device = get_device() - - -def unit_circle_integrand(x, f): - """ - Indicator function for the unit circle: 1 if point is inside the circle, 0 otherwise. - The true integral value is π (the area of the unit circle). - - Args: - x (torch.Tensor): Batch of points in [-1,1]×[-1,1] - f (torch.Tensor): Output tensor to store function values - - Returns: - torch.Tensor: Indicator values (0 or 1) for each point - """ - f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double() - return f[:, 0] - - -def half_sphere_integrand(x, f): - """ - Function representing a half-sphere with radius 1. - The true integral value is 2π/3 (the volume of the half-sphere). - - Args: - x (torch.Tensor): Batch of points in [-1,1]×[-1,1] - f (torch.Tensor): Output tensor to store function values - - Returns: - torch.Tensor: Height of the half-sphere at each point - """ - f[:, 0] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2 - return f[:, 0] - - -# Set parameters -dim = 2 # 2D integration -bounds = [(-1, 1), (-1, 1)] # Integration bounds -n_eval = 6400000 # Number of function evaluations -batch_size = 10000 # Batch size for sampling -n_therm = 20 # Number of thermalization steps for MCMC - -# Initialize VEGAS map for adaptive importance sampling -vegas_map = Vegas(dim, device=device, ninc=10) - -# PART 1: Unit Circle Integration - -# Initialize MC and MCMC integrators for the unit circle -mc_integrator = MonteCarlo( - f=unit_circle_integrand, bounds=bounds, batch_size=batch_size -) -mcmc_integrator = MarkovChainMonteCarlo( - f=unit_circle_integrand, bounds=bounds, batch_size=batch_size, nburnin=n_therm -) - -print("Unit Circle Integration Results:") -print("Plain MC:", mc_integrator(n_eval)) # True value: π ≈ 3.14159... -print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5)) - -# Train VEGAS map specifically for the unit circle integrand -vegas_map.adaptive_training(batch_size, unit_circle_integrand, alpha=0.5) - -# Initialize integrators that use the trained VEGAS map -vegas_integrator = MonteCarlo( - bounds, f=unit_circle_integrand, maps=vegas_map, batch_size=batch_size -) -vegasmcmc_integrator = MarkovChainMonteCarlo( - bounds, - f=unit_circle_integrand, - maps=vegas_map, - batch_size=batch_size, - nburnin=n_therm, -) - -print("VEGAS:", vegas_integrator(n_eval)) -print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5)) - -# PART 2: Half-Sphere Integration - -# Reuse the same integrators but with the half-sphere integrand -mc_integrator.f = half_sphere_integrand -mcmc_integrator.f = half_sphere_integrand - -print("\nHalf-Sphere Integration Results:") -print("Plain MC:", mc_integrator(n_eval)) # True value: 2π/3 ≈ 2.09440... -print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5)) - -# Reset and retrain the VEGAS map for the half-sphere integrand -vegas_map.make_uniform() -vegas_map.adaptive_training( - batch_size, half_sphere_integrand, epoch=10, alpha=0.5) - -# Update the integrators to use the half-sphere integrand -vegas_integrator.f = half_sphere_integrand -vegasmcmc_integrator.f = half_sphere_integrand - -print("VEGAS:", vegas_integrator(n_eval)) -print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5)) +import torch.distributed as dist +import torch.multiprocessing as mp +import os +import sys +import traceback +from MCintegration import MonteCarlo, MarkovChainMonteCarlo, Vegas +os.environ["NCCL_DEBUG"] = "OFF" +os.environ["TORCH_DISTRIBUTED_DEBUG"] = "OFF" +os.environ["GLOG_minloglevel"] = "2" +os.environ["MASTER_ADDR"] = os.getenv("MASTER_ADDR", "localhost") +os.environ["MASTER_PORT"] = os.getenv("MASTER_PORT", "12355") + +backend = "nccl" + +def init_process(rank, world_size, fn, backend=backend): + try: + dist.init_process_group(backend, rank=rank, world_size=world_size) + fn(rank, world_size) + except Exception as e: + traceback.print_exc() + if dist.is_initialized(): + dist.destroy_process_group() + raise e + +def run_mcmc(rank, world_size): + try: + if rank != 0: + sys.stderr = open(os.devnull, "w") + sys.stdout = open(os.devnull, "w") + torch.manual_seed(42 + rank) + + def unit_circle_integrand(x, f): + f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double() + return f[:, 0] + + + def half_sphere_integrand(x, f): + f[:, 0] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2 + return f[:, 0] + + dim = 2 + bounds = [(-1, 1), (-1, 1)] + n_eval = 6400000 + batch_size = 40000 + n_therm = 20 + + device = torch.device(f"cuda:{rank}") + + vegas_map = Vegas(dim, device=device, ninc=10) + + # Monte Carlo and MCMC for Unit Circle + mc_integrator = MonteCarlo( + f=unit_circle_integrand, bounds=bounds, batch_size=batch_size, + device=device + ) + mcmc_integrator = MarkovChainMonteCarlo( + f=unit_circle_integrand, bounds=bounds, batch_size=batch_size, nburnin=n_therm, + device=device + ) + + print("Unit Circle Integration Results:") + print("Plain MC:", mc_integrator(n_eval)) + print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5)) + + # Train VEGAS map for Unit Circle + vegas_map.adaptive_training(batch_size, unit_circle_integrand, alpha=0.5) + vegas_integrator = MonteCarlo( + bounds, f=unit_circle_integrand, maps=vegas_map, batch_size=batch_size, + device=device + ) + vegasmcmc_integrator = MarkovChainMonteCarlo( + bounds, + f=unit_circle_integrand, + maps=vegas_map, + batch_size=batch_size, + nburnin=n_therm, + device=device + ) + + print("VEGAS:", vegas_integrator(n_eval)) + print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5)) + + # Monte Carlo and MCMC for Half-Sphere + mc_integrator.f = half_sphere_integrand + mcmc_integrator.f = half_sphere_integrand + + print("\nHalf-Sphere Integration Results:") + print("Plain MC:", mc_integrator(n_eval)) + print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5)) + + vegas_map.make_uniform() + vegas_map.adaptive_training(batch_size, half_sphere_integrand, epoch=10, alpha=0.5) + vegas_integrator.f = half_sphere_integrand + vegasmcmc_integrator.f = half_sphere_integrand + + print("VEGAS:", vegas_integrator(n_eval)) + print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5)) + + + except Exception as e: + print(f"Error in run_mcmc for rank {rank}: {e}") + traceback.print_exc() + raise e + finally: + if dist.is_initialized(): + dist.destroy_process_group() + + +def test_mcmc(world_size): + world_size = min(world_size, mp.cpu_count()) + try: + mp.spawn( + init_process, + args=(world_size, run_mcmc), + nprocs=world_size, + join=True, + daemon=False, + ) + except Exception as e: + print(f"Error in test_mcmc: {e}") + +if __name__ == "__main__": + mp.set_start_method("spawn", force=True) + test_mcmc(4) \ No newline at end of file diff --git a/examples/example_2.py b/examples/example_2.py index 19db419..f94d00e 100644 --- a/examples/example_2.py +++ b/examples/example_2.py @@ -14,84 +14,111 @@ # The integrand contains three components (f_dim=3) to demonstrate multi-dimensional outputs. import torch -from MCintegration import MonteCarlo, MarkovChainMonteCarlo, Vegas, set_seed, get_device - -# Set random seed for reproducibility and get computation device -set_seed(42) -device = get_device() - - -def sharp_integrands(x, f): - """ - A function with a sharp Gaussian peak centered at (0.5, 0.5, 0.5, 0.5). - The function returns three components: - 1. exp(-200*sum((x_i - 0.5)²)) - 2. x_0 * exp(-200*sum((x_i - 0.5)²)) - 3. x_0² * exp(-200*sum((x_i - 0.5)²)) - - Args: - x (torch.Tensor): Batch of points in [0,1]⁴ - f (torch.Tensor): Output tensor to store function values - - Returns: - torch.Tensor: Mean of the three components for each point - """ - # Compute distance from the center point (0.5, 0.5, 0.5, 0.5) - f[:, 0] = torch.sum((x - 0.5) ** 2, dim=-1) - # Scale by -200 to create a sharp peak - f[:, 0] *= -200 - # Apply exponential to get Gaussian function - f[:, 0].exp_() - # Second component: first coordinate times the Gaussian - f[:, 1] = f[:, 0] * x[:, 0] - # Third component: square of first coordinate times the Gaussian - f[:, 2] = f[:, 0] * x[:, 0] ** 2 - return f.mean(dim=-1) - - -# Set parameters -dim = 4 # 4D integration -bounds = [(0, 1)] * dim # Integration bounds (unit hypercube) -n_eval = 500000 # Number of function evaluations -batch_size = 10000 # Batch size for sampling -n_therm = 20 # Number of thermalization steps for MCMC - -# Initialize VEGAS map with finer grid for better adaptation to the sharp peak -vegas_map = Vegas(dim, device=device, ninc=1000) - -# Initialize plain MC and MCMC integrators -# Note that f_dim=3 to handle the three components of the integrand -mc_integrator = MonteCarlo( - f=sharp_integrands, f_dim=3, bounds=bounds, batch_size=batch_size -) -mcmc_integrator = MarkovChainMonteCarlo( - f=sharp_integrands, f_dim=3, bounds=bounds, batch_size=batch_size, nburnin=n_therm -) - -print("Sharp Peak Integration Results:") -print("Plain MC:", mc_integrator(n_eval)) -print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5)) - -# Train VEGAS map to adapt to the sharp peak -# Using alpha=2.0 for more aggressive grid adaptation -vegas_map.adaptive_training( - batch_size, sharp_integrands, f_dim=3, epoch=10, alpha=2.0) - -# Initialize integrators that use the trained VEGAS map -vegas_integrator = MonteCarlo( - bounds, f=sharp_integrands, f_dim=3, maps=vegas_map, batch_size=batch_size -) -vegasmcmc_integrator = MarkovChainMonteCarlo( - bounds, - f=sharp_integrands, - f_dim=3, - maps=vegas_map, - batch_size=batch_size, - nburnin=n_therm, -) - -print("VEGAS:", vegas_integrator(n_eval)) -print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5)) - -# Expected outcome: VEGAS should significantly outperform plain MC -# since it adapts the sampling grid to concentrate points near the peak. +import torch.distributed as dist +import torch.multiprocessing as mp +import os +import sys +import traceback +from MCintegration import MonteCarlo, MarkovChainMonteCarlo, Vegas +os.environ["NCCL_DEBUG"] = "OFF" +os.environ["TORCH_DISTRIBUTED_DEBUG"] = "OFF" +os.environ["GLOG_minloglevel"] = "2" +os.environ["MASTER_ADDR"] = os.getenv("MASTER_ADDR", "localhost") +os.environ["MASTER_PORT"] = os.getenv("MASTER_PORT", "12355") + +backend = "nccl" + +def init_process(rank, world_size, fn, backend=backend): + try: + dist.init_process_group(backend, rank=rank, world_size=world_size) + fn(rank, world_size) + except Exception as e: + traceback.print_exc() + if dist.is_initialized(): + dist.destroy_process_group() + raise e + +def run_mcmc(rank, world_size): + try: + if rank != 0: + sys.stderr = open(os.devnull, "w") + sys.stdout = open(os.devnull, "w") + torch.manual_seed(42 + rank) + + def sharp_integrands(x, f): + f[:, 0] = torch.sum((x - 0.5) ** 2, dim=-1) + f[:, 0] *= -200 + f[:, 0].exp_() + f[:, 1] = f[:, 0] * x[:, 0] + f[:, 2] = f[:, 0] * x[:, 0] ** 2 + return f.mean(dim=-1) + + + dim = 4 + bounds = [(0, 1)] * dim + n_eval = 6400000 + batch_size = 40000 + n_therm = 20 + + device = torch.device(f"cuda:{rank}") + + print(f"Process {rank} using device: {device}") + + vegas_map = Vegas(dim, device=device, ninc=1000) + + # Plain MC and MCMC + mc_integrator = MonteCarlo( + f=sharp_integrands, f_dim=3, bounds=bounds, batch_size=batch_size,device=device + ) + mcmc_integrator = MarkovChainMonteCarlo( + f=sharp_integrands, f_dim=3, bounds=bounds, batch_size=batch_size, nburnin=n_therm,device=device + ) + + print("Sharp Peak Integration Results:") + print("Plain MC:", mc_integrator(n_eval)) + print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5)) + + # Train VEGAS map + vegas_map.adaptive_training(batch_size, sharp_integrands, f_dim=3, epoch=10, alpha=2.0) + vegas_integrator = MonteCarlo( + bounds, f=sharp_integrands, f_dim=3, maps=vegas_map, batch_size=batch_size,device=device + ) + vegasmcmc_integrator = MarkovChainMonteCarlo( + bounds, + f=sharp_integrands, + f_dim=3, + maps=vegas_map, + batch_size=batch_size, + nburnin=n_therm, + device=device + ) + + print("VEGAS:", vegas_integrator(n_eval)) + print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5)) + + + except Exception as e: + print(f"Error in run_mcmc for rank {rank}: {e}") + traceback.print_exc() + raise e + finally: + if dist.is_initialized(): + dist.destroy_process_group() + + +def test_mcmc(world_size): + world_size = min(world_size, mp.cpu_count()) + try: + mp.spawn( + init_process, + args=(world_size, run_mcmc), + nprocs=world_size, + join=True, + daemon=False, + ) + except Exception as e: + print(f"Error in test_mcmc: {e}") + +if __name__ == "__main__": + mp.set_start_method("spawn", force=True) + test_mcmc(4) \ No newline at end of file diff --git a/examples/example_3.py b/examples/example_3.py index b2ac147..b52be2a 100644 --- a/examples/example_3.py +++ b/examples/example_3.py @@ -1,54 +1,107 @@ # Example 3: Integration of log(x)/sqrt(x) using VEGAS import torch -from MCintegration import MonteCarlo, MarkovChainMonteCarlo -from MCintegration import Vegas, set_seed, get_device +import torch.distributed as dist +import torch.multiprocessing as mp +import os +import sys +import traceback +from MCintegration import MonteCarlo, MarkovChainMonteCarlo, Vegas +os.environ["NCCL_DEBUG"] = "OFF" +os.environ["TORCH_DISTRIBUTED_DEBUG"] = "OFF" +os.environ["GLOG_minloglevel"] = "2" +os.environ["MASTER_ADDR"] = os.getenv("MASTER_ADDR", "localhost") +os.environ["MASTER_PORT"] = os.getenv("MASTER_PORT", "12355") -set_seed(42) -device = get_device() +backend = "nccl" +def init_process(rank, world_size, fn, backend=backend): + try: + dist.init_process_group(backend, rank=rank, world_size=world_size) + fn(rank, world_size) + except Exception as e: + traceback.print_exc() + if dist.is_initialized(): + dist.destroy_process_group() + raise e -def func(x, f): - f[:, 0] = torch.log(x[:, 0]) / torch.sqrt(x[:, 0]) - return f[:, 0] +def run_mcmc(rank, world_size): + try: + if rank != 0: + sys.stderr = open(os.devnull, "w") + sys.stdout = open(os.devnull, "w") + torch.manual_seed(42 + rank) + def func(x, f): + f[:, 0] = torch.log(x[:, 0]) / torch.sqrt(x[:, 0]) + return f[:, 0] -dim = 1 -bounds = [[0, 1]] * dim -n_eval = 500000 -batch_size = 10000 -alpha = 2.0 -ninc = 1000 -n_therm = 20 + dim = 1 + bounds = [[0, 1]] * dim + n_eval = 6400000 + batch_size = 40000 + alpha = 2.0 + ninc = 1000 + n_therm = 20 -vegas_map = Vegas(dim, device=device, ninc=ninc) + device = torch.device(f"cuda:{rank}") -# Train VEGAS map -print("Training VEGAS map for log(x)/sqrt(x)... \n") -vegas_map.adaptive_training(batch_size, func, epoch=10, alpha=alpha) + print(f"Process {rank} using device: {device}") -print("Integration Results for log(x)/sqrt(x):") + vegas_map = Vegas(dim, device=device, ninc=ninc) + # Train VEGAS map + print("Training VEGAS map for log(x)/sqrt(x)... \n") + vegas_map.adaptive_training(batch_size, func, epoch=10, alpha=alpha) -# Plain MC Integration -mc_integrator = MonteCarlo(bounds, func, batch_size=batch_size) -print("Plain MC Integral Result:", mc_integrator(n_eval)) + print("Integration Results for log(x)/sqrt(x):") -# MCMC Integration -mcmc_integrator = MarkovChainMonteCarlo( - bounds, func, batch_size=batch_size, nburnin=n_therm -) -print("MCMC Integral Result:", mcmc_integrator(n_eval, mix_rate=0.5)) -# Perform VEGAS integration -vegas_integrator = MonteCarlo(bounds, func, maps=vegas_map, batch_size=batch_size) -res = vegas_integrator(n_eval) + # Plain MC Integration + mc_integrator = MonteCarlo(bounds, func, batch_size=batch_size,device=device) + print("Plain MC Integral Result:", mc_integrator(n_eval)) -print("VEGAS Integral Result:", res) + # MCMC Integration + mcmc_integrator = MarkovChainMonteCarlo( + bounds, func, batch_size=batch_size, nburnin=n_therm,device=device + ) + print("MCMC Integral Result:", mcmc_integrator(n_eval, mix_rate=0.5)) -# VEGAS-MCMC Integration -vegasmcmc_integrator = MarkovChainMonteCarlo( - bounds, func, maps=vegas_map, batch_size=batch_size, nburnin=n_therm -) -res_vegasmcmc = vegasmcmc_integrator(n_eval, mix_rate=0.5) -print("VEGAS-MCMC Integral Result:", res_vegasmcmc) + # Perform VEGAS integration + vegas_integrator = MonteCarlo(bounds, func, maps=vegas_map, batch_size=batch_size,device=device) + res = vegas_integrator(n_eval) + + print("VEGAS Integral Result:", res) + + # VEGAS-MCMC Integration + vegasmcmc_integrator = MarkovChainMonteCarlo( + bounds, func, maps=vegas_map, batch_size=batch_size, nburnin=n_therm,device=device + ) + res_vegasmcmc = vegasmcmc_integrator(n_eval, mix_rate=0.5) + print("VEGAS-MCMC Integral Result:", res_vegasmcmc) + + except Exception as e: + print(f"Error in run_mcmc for rank {rank}: {e}") + traceback.print_exc() + raise e + finally: + if dist.is_initialized(): + dist.destroy_process_group() + + +def test_mcmc(world_size): + world_size = min(world_size, mp.cpu_count()) + try: + mp.spawn( + init_process, + args=(world_size, run_mcmc), + nprocs=world_size, + join=True, + daemon=False, + ) + except Exception as e: + print(f"Error in test_mcmc: {e}") + +if __name__ == "__main__": + mp.set_start_method("spawn", force=True) + test_mcmc(4) \ No newline at end of file