Skip to content

update examples #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 83 additions & 39 deletions MCintegration/mc_multicpu_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,100 @@
import torch.distributed as dist
import torch.multiprocessing as mp
import os
import traceback
from integrators import MonteCarlo, MarkovChainMonteCarlo

# Set environment variables before spawning processes
os.environ["MASTER_ADDR"] = os.getenv("MASTER_ADDR", "localhost")
os.environ["MASTER_PORT"] = os.getenv("MASTER_PORT", "12355")

def init_process(rank, world_size, fn, backend="gloo"):
# Set MASTER_ADDR and MASTER_PORT appropriately
# Assuming environment variables are set by the cluster's job scheduler
master_addr = os.getenv("MASTER_ADDR", "localhost")
master_port = os.getenv("MASTER_PORT", "12355")
backend = "gloo"

os.environ["MASTER_ADDR"] = master_addr
os.environ["MASTER_PORT"] = master_port

def init_process(rank, world_size, fn, backend=backend):
# try:
# Initialize the process group
dist.init_process_group(backend, rank=rank, world_size=world_size)
# Call the function
fn(rank, world_size)
# except Exception as e:
# print(f"Error in process {rank}: {e}")
# traceback.print_exc()
# # Make sure to clean up
# if dist.is_initialized():
# dist.destroy_process_group()
# # Return non-zero to indicate error
# raise e


def run_mcmc(rank, world_size):
# Instantiate the MarkovChainMonteCarlo class
bounds = [(-1, 1), (-1, 1)]
n_eval = 8000000
batch_size = 10000
n_therm = 10

# Define the function to be integrated (dummy example)
def two_integrands(x, f):
f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double()
f[:, 1] = -torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0)
return f.mean(dim=-1)

# device = torch.device(f"cuda:{rank}" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")
mcmc = MarkovChainMonteCarlo(
bounds,
two_integrands,
2,
batch_size=batch_size,
nburnin=n_therm,
device=device,
)
print(world_size)
try:
# Set seed for reproducibility but different for each process
torch.manual_seed(42 + rank)

# Instantiate the MarkovChainMonteCarlo class
bounds = [(-1, 1), (-1, 1)]
# n_eval = 8000000 // world_size # Divide evaluations among processes
n_eval = 8000000
batch_size = 10000
n_therm = 20

# Define the function to be integrated (dummy example)
def two_integrands(x, f):
f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double()
f[:, 1] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2
return f.mean(dim=-1)

# Choose device based on availability and rank
if torch.cuda.is_available() and torch.cuda.device_count() > world_size:
device = torch.device(f"cuda:{rank % torch.cuda.device_count()}")

Check warning on line 52 in MCintegration/mc_multicpu_test.py

View check run for this annotation

Codecov / codecov/patch

MCintegration/mc_multicpu_test.py#L52

Added line #L52 was not covered by tests
else:
device = torch.device("cpu")

print(f"Process {rank} using device: {device}")

mcmc = MarkovChainMonteCarlo(
bounds=bounds,
f=two_integrands,
f_dim=2,
batch_size=batch_size,
nburnin=n_therm,
device=device,
)

# Call the MarkovChainMonteCarlo method
mcmc_result = mcmc(n_eval)

if rank == 0:
print("MarkovChainMonteCarlo Result:", mcmc_result)

# except Exception as e:
# print(f"Error in run_mcmc for rank {rank}: {e}")
# traceback.print_exc()
# raise e
finally:
# Clean up
if dist.is_initialized():
dist.destroy_process_group()


def test_mcmc(world_size=2):
# Use fewer processes than CPU cores to avoid resource contention
world_size = min(world_size, mp.cpu_count())
print(f"Starting with {world_size} processes")

# Call the MarkovChainMonteCarlo method
mcmc_result = mcmc(
neval=n_eval,
f_dim=2,
# Start processes with proper error handling
mp.spawn(
init_process,
args=(world_size, run_mcmc),
nprocs=world_size,
join=True,
daemon=False,
)

if rank == 0:
# Only rank 0 prints the result
print("MarkovChainMonteCarlo Result:", mcmc_result)

def test_mcmc():
world_size = 8 # Number of processes to launch
mp.spawn(init_process, args=(world_size, run_mcmc), nprocs=world_size, join=True)
# if __name__ == "__main__":
# # Prevent issues with multiprocessing on some platforms
# mp.set_start_method("spawn", force=True)
# test_mcmc(2)
226 changes: 124 additions & 102 deletions examples/example_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,105 +14,127 @@
# Both integrands are defined over the square [-1,1]×[-1,1]

import torch
from MCintegration import MonteCarlo, MarkovChainMonteCarlo, Vegas, set_seed, get_device

# Set random seed for reproducibility and get computation device
set_seed(42)
device = get_device()


def unit_circle_integrand(x, f):
"""
Indicator function for the unit circle: 1 if point is inside the circle, 0 otherwise.
The true integral value is π (the area of the unit circle).

Args:
x (torch.Tensor): Batch of points in [-1,1]×[-1,1]
f (torch.Tensor): Output tensor to store function values

Returns:
torch.Tensor: Indicator values (0 or 1) for each point
"""
f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double()
return f[:, 0]


def half_sphere_integrand(x, f):
"""
Function representing a half-sphere with radius 1.
The true integral value is 2π/3 (the volume of the half-sphere).

Args:
x (torch.Tensor): Batch of points in [-1,1]×[-1,1]
f (torch.Tensor): Output tensor to store function values

Returns:
torch.Tensor: Height of the half-sphere at each point
"""
f[:, 0] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2
return f[:, 0]


# Set parameters
dim = 2 # 2D integration
bounds = [(-1, 1), (-1, 1)] # Integration bounds
n_eval = 6400000 # Number of function evaluations
batch_size = 10000 # Batch size for sampling
n_therm = 20 # Number of thermalization steps for MCMC

# Initialize VEGAS map for adaptive importance sampling
vegas_map = Vegas(dim, device=device, ninc=10)

# PART 1: Unit Circle Integration

# Initialize MC and MCMC integrators for the unit circle
mc_integrator = MonteCarlo(
f=unit_circle_integrand, bounds=bounds, batch_size=batch_size
)
mcmc_integrator = MarkovChainMonteCarlo(
f=unit_circle_integrand, bounds=bounds, batch_size=batch_size, nburnin=n_therm
)

print("Unit Circle Integration Results:")
print("Plain MC:", mc_integrator(n_eval)) # True value: π ≈ 3.14159...
print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5))

# Train VEGAS map specifically for the unit circle integrand
vegas_map.adaptive_training(batch_size, unit_circle_integrand, alpha=0.5)

# Initialize integrators that use the trained VEGAS map
vegas_integrator = MonteCarlo(
bounds, f=unit_circle_integrand, maps=vegas_map, batch_size=batch_size
)
vegasmcmc_integrator = MarkovChainMonteCarlo(
bounds,
f=unit_circle_integrand,
maps=vegas_map,
batch_size=batch_size,
nburnin=n_therm,
)

print("VEGAS:", vegas_integrator(n_eval))
print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5))

# PART 2: Half-Sphere Integration

# Reuse the same integrators but with the half-sphere integrand
mc_integrator.f = half_sphere_integrand
mcmc_integrator.f = half_sphere_integrand

print("\nHalf-Sphere Integration Results:")
print("Plain MC:", mc_integrator(n_eval)) # True value: 2π/3 ≈ 2.09440...
print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5))

# Reset and retrain the VEGAS map for the half-sphere integrand
vegas_map.make_uniform()
vegas_map.adaptive_training(
batch_size, half_sphere_integrand, epoch=10, alpha=0.5)

# Update the integrators to use the half-sphere integrand
vegas_integrator.f = half_sphere_integrand
vegasmcmc_integrator.f = half_sphere_integrand

print("VEGAS:", vegas_integrator(n_eval))
print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5))
import torch.distributed as dist
import torch.multiprocessing as mp
import os
import sys
import traceback
from MCintegration import MonteCarlo, MarkovChainMonteCarlo, Vegas
os.environ["NCCL_DEBUG"] = "OFF"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "OFF"
os.environ["GLOG_minloglevel"] = "2"
os.environ["MASTER_ADDR"] = os.getenv("MASTER_ADDR", "localhost")
os.environ["MASTER_PORT"] = os.getenv("MASTER_PORT", "12355")

backend = "nccl"

def init_process(rank, world_size, fn, backend=backend):
try:
dist.init_process_group(backend, rank=rank, world_size=world_size)
fn(rank, world_size)
except Exception as e:
traceback.print_exc()
if dist.is_initialized():
dist.destroy_process_group()
raise e

def run_mcmc(rank, world_size):
try:
if rank != 0:
sys.stderr = open(os.devnull, "w")
sys.stdout = open(os.devnull, "w")
torch.manual_seed(42 + rank)

def unit_circle_integrand(x, f):
f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double()
return f[:, 0]


def half_sphere_integrand(x, f):
f[:, 0] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2
return f[:, 0]

dim = 2
bounds = [(-1, 1), (-1, 1)]
n_eval = 6400000
batch_size = 40000
n_therm = 20

device = torch.device(f"cuda:{rank}")

vegas_map = Vegas(dim, device=device, ninc=10)

# Monte Carlo and MCMC for Unit Circle
mc_integrator = MonteCarlo(
f=unit_circle_integrand, bounds=bounds, batch_size=batch_size,
device=device
)
mcmc_integrator = MarkovChainMonteCarlo(
f=unit_circle_integrand, bounds=bounds, batch_size=batch_size, nburnin=n_therm,
device=device
)

print("Unit Circle Integration Results:")
print("Plain MC:", mc_integrator(n_eval))
print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5))

# Train VEGAS map for Unit Circle
vegas_map.adaptive_training(batch_size, unit_circle_integrand, alpha=0.5)
vegas_integrator = MonteCarlo(
bounds, f=unit_circle_integrand, maps=vegas_map, batch_size=batch_size,
device=device
)
vegasmcmc_integrator = MarkovChainMonteCarlo(
bounds,
f=unit_circle_integrand,
maps=vegas_map,
batch_size=batch_size,
nburnin=n_therm,
device=device
)

print("VEGAS:", vegas_integrator(n_eval))
print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5))

# Monte Carlo and MCMC for Half-Sphere
mc_integrator.f = half_sphere_integrand
mcmc_integrator.f = half_sphere_integrand

print("\nHalf-Sphere Integration Results:")
print("Plain MC:", mc_integrator(n_eval))
print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5))

vegas_map.make_uniform()
vegas_map.adaptive_training(batch_size, half_sphere_integrand, epoch=10, alpha=0.5)
vegas_integrator.f = half_sphere_integrand
vegasmcmc_integrator.f = half_sphere_integrand

print("VEGAS:", vegas_integrator(n_eval))
print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5))


except Exception as e:
print(f"Error in run_mcmc for rank {rank}: {e}")
traceback.print_exc()
raise e
finally:
if dist.is_initialized():
dist.destroy_process_group()


def test_mcmc(world_size):
world_size = min(world_size, mp.cpu_count())
try:
mp.spawn(
init_process,
args=(world_size, run_mcmc),
nprocs=world_size,
join=True,
daemon=False,
)
except Exception as e:
print(f"Error in test_mcmc: {e}")

if __name__ == "__main__":
mp.set_start_method("spawn", force=True)
test_mcmc(4)
Loading