Skip to content

Commit cc34581

Browse files
authored
Merge pull request #36 from numericalEFT/comment
update examples
2 parents 345fded + f721aae commit cc34581

File tree

4 files changed

+405
-259
lines changed

4 files changed

+405
-259
lines changed

MCintegration/mc_multicpu_test.py

Lines changed: 83 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,56 +2,100 @@
22
import torch.distributed as dist
33
import torch.multiprocessing as mp
44
import os
5+
import traceback
56
from integrators import MonteCarlo, MarkovChainMonteCarlo
67

8+
# Set environment variables before spawning processes
9+
os.environ["MASTER_ADDR"] = os.getenv("MASTER_ADDR", "localhost")
10+
os.environ["MASTER_PORT"] = os.getenv("MASTER_PORT", "12355")
711

8-
def init_process(rank, world_size, fn, backend="gloo"):
9-
# Set MASTER_ADDR and MASTER_PORT appropriately
10-
# Assuming environment variables are set by the cluster's job scheduler
11-
master_addr = os.getenv("MASTER_ADDR", "localhost")
12-
master_port = os.getenv("MASTER_PORT", "12355")
12+
backend = "gloo"
1313

14-
os.environ["MASTER_ADDR"] = master_addr
15-
os.environ["MASTER_PORT"] = master_port
1614

15+
def init_process(rank, world_size, fn, backend=backend):
16+
# try:
17+
# Initialize the process group
1718
dist.init_process_group(backend, rank=rank, world_size=world_size)
19+
# Call the function
1820
fn(rank, world_size)
21+
# except Exception as e:
22+
# print(f"Error in process {rank}: {e}")
23+
# traceback.print_exc()
24+
# # Make sure to clean up
25+
# if dist.is_initialized():
26+
# dist.destroy_process_group()
27+
# # Return non-zero to indicate error
28+
# raise e
1929

2030

2131
def run_mcmc(rank, world_size):
22-
# Instantiate the MarkovChainMonteCarlo class
23-
bounds = [(-1, 1), (-1, 1)]
24-
n_eval = 8000000
25-
batch_size = 10000
26-
n_therm = 10
27-
28-
# Define the function to be integrated (dummy example)
29-
def two_integrands(x, f):
30-
f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double()
31-
f[:, 1] = -torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0)
32-
return f.mean(dim=-1)
33-
34-
# device = torch.device(f"cuda:{rank}" if torch.cuda.is_available() else "cpu")
35-
device = torch.device("cpu")
36-
mcmc = MarkovChainMonteCarlo(
37-
bounds,
38-
two_integrands,
39-
2,
40-
batch_size=batch_size,
41-
nburnin=n_therm,
42-
device=device,
43-
)
32+
print(world_size)
33+
try:
34+
# Set seed for reproducibility but different for each process
35+
torch.manual_seed(42 + rank)
36+
37+
# Instantiate the MarkovChainMonteCarlo class
38+
bounds = [(-1, 1), (-1, 1)]
39+
# n_eval = 8000000 // world_size # Divide evaluations among processes
40+
n_eval = 8000000
41+
batch_size = 10000
42+
n_therm = 20
43+
44+
# Define the function to be integrated (dummy example)
45+
def two_integrands(x, f):
46+
f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double()
47+
f[:, 1] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2
48+
return f.mean(dim=-1)
49+
50+
# Choose device based on availability and rank
51+
if torch.cuda.is_available() and torch.cuda.device_count() > world_size:
52+
device = torch.device(f"cuda:{rank % torch.cuda.device_count()}")
53+
else:
54+
device = torch.device("cpu")
55+
56+
print(f"Process {rank} using device: {device}")
57+
58+
mcmc = MarkovChainMonteCarlo(
59+
bounds=bounds,
60+
f=two_integrands,
61+
f_dim=2,
62+
batch_size=batch_size,
63+
nburnin=n_therm,
64+
device=device,
65+
)
66+
67+
# Call the MarkovChainMonteCarlo method
68+
mcmc_result = mcmc(n_eval)
69+
70+
if rank == 0:
71+
print("MarkovChainMonteCarlo Result:", mcmc_result)
72+
73+
# except Exception as e:
74+
# print(f"Error in run_mcmc for rank {rank}: {e}")
75+
# traceback.print_exc()
76+
# raise e
77+
finally:
78+
# Clean up
79+
if dist.is_initialized():
80+
dist.destroy_process_group()
81+
82+
83+
def test_mcmc(world_size=2):
84+
# Use fewer processes than CPU cores to avoid resource contention
85+
world_size = min(world_size, mp.cpu_count())
86+
print(f"Starting with {world_size} processes")
4487

45-
# Call the MarkovChainMonteCarlo method
46-
mcmc_result = mcmc(
47-
neval=n_eval,
48-
f_dim=2,
88+
# Start processes with proper error handling
89+
mp.spawn(
90+
init_process,
91+
args=(world_size, run_mcmc),
92+
nprocs=world_size,
93+
join=True,
94+
daemon=False,
4995
)
5096

51-
if rank == 0:
52-
# Only rank 0 prints the result
53-
print("MarkovChainMonteCarlo Result:", mcmc_result)
5497

55-
def test_mcmc():
56-
world_size = 8 # Number of processes to launch
57-
mp.spawn(init_process, args=(world_size, run_mcmc), nprocs=world_size, join=True)
98+
# if __name__ == "__main__":
99+
# # Prevent issues with multiprocessing on some platforms
100+
# mp.set_start_method("spawn", force=True)
101+
# test_mcmc(2)

examples/example_1.py

Lines changed: 124 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -14,105 +14,127 @@
1414
# Both integrands are defined over the square [-1,1]×[-1,1]
1515

1616
import torch
17-
from MCintegration import MonteCarlo, MarkovChainMonteCarlo, Vegas, set_seed, get_device
18-
19-
# Set random seed for reproducibility and get computation device
20-
set_seed(42)
21-
device = get_device()
22-
23-
24-
def unit_circle_integrand(x, f):
25-
"""
26-
Indicator function for the unit circle: 1 if point is inside the circle, 0 otherwise.
27-
The true integral value is π (the area of the unit circle).
28-
29-
Args:
30-
x (torch.Tensor): Batch of points in [-1,1]×[-1,1]
31-
f (torch.Tensor): Output tensor to store function values
32-
33-
Returns:
34-
torch.Tensor: Indicator values (0 or 1) for each point
35-
"""
36-
f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double()
37-
return f[:, 0]
38-
39-
40-
def half_sphere_integrand(x, f):
41-
"""
42-
Function representing a half-sphere with radius 1.
43-
The true integral value is 2π/3 (the volume of the half-sphere).
44-
45-
Args:
46-
x (torch.Tensor): Batch of points in [-1,1]×[-1,1]
47-
f (torch.Tensor): Output tensor to store function values
48-
49-
Returns:
50-
torch.Tensor: Height of the half-sphere at each point
51-
"""
52-
f[:, 0] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2
53-
return f[:, 0]
54-
55-
56-
# Set parameters
57-
dim = 2 # 2D integration
58-
bounds = [(-1, 1), (-1, 1)] # Integration bounds
59-
n_eval = 6400000 # Number of function evaluations
60-
batch_size = 10000 # Batch size for sampling
61-
n_therm = 20 # Number of thermalization steps for MCMC
62-
63-
# Initialize VEGAS map for adaptive importance sampling
64-
vegas_map = Vegas(dim, device=device, ninc=10)
65-
66-
# PART 1: Unit Circle Integration
67-
68-
# Initialize MC and MCMC integrators for the unit circle
69-
mc_integrator = MonteCarlo(
70-
f=unit_circle_integrand, bounds=bounds, batch_size=batch_size
71-
)
72-
mcmc_integrator = MarkovChainMonteCarlo(
73-
f=unit_circle_integrand, bounds=bounds, batch_size=batch_size, nburnin=n_therm
74-
)
75-
76-
print("Unit Circle Integration Results:")
77-
print("Plain MC:", mc_integrator(n_eval)) # True value: π ≈ 3.14159...
78-
print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5))
79-
80-
# Train VEGAS map specifically for the unit circle integrand
81-
vegas_map.adaptive_training(batch_size, unit_circle_integrand, alpha=0.5)
82-
83-
# Initialize integrators that use the trained VEGAS map
84-
vegas_integrator = MonteCarlo(
85-
bounds, f=unit_circle_integrand, maps=vegas_map, batch_size=batch_size
86-
)
87-
vegasmcmc_integrator = MarkovChainMonteCarlo(
88-
bounds,
89-
f=unit_circle_integrand,
90-
maps=vegas_map,
91-
batch_size=batch_size,
92-
nburnin=n_therm,
93-
)
94-
95-
print("VEGAS:", vegas_integrator(n_eval))
96-
print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5))
97-
98-
# PART 2: Half-Sphere Integration
99-
100-
# Reuse the same integrators but with the half-sphere integrand
101-
mc_integrator.f = half_sphere_integrand
102-
mcmc_integrator.f = half_sphere_integrand
103-
104-
print("\nHalf-Sphere Integration Results:")
105-
print("Plain MC:", mc_integrator(n_eval)) # True value: 2π/3 ≈ 2.09440...
106-
print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5))
107-
108-
# Reset and retrain the VEGAS map for the half-sphere integrand
109-
vegas_map.make_uniform()
110-
vegas_map.adaptive_training(
111-
batch_size, half_sphere_integrand, epoch=10, alpha=0.5)
112-
113-
# Update the integrators to use the half-sphere integrand
114-
vegas_integrator.f = half_sphere_integrand
115-
vegasmcmc_integrator.f = half_sphere_integrand
116-
117-
print("VEGAS:", vegas_integrator(n_eval))
118-
print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5))
17+
import torch.distributed as dist
18+
import torch.multiprocessing as mp
19+
import os
20+
import sys
21+
import traceback
22+
from MCintegration import MonteCarlo, MarkovChainMonteCarlo, Vegas
23+
os.environ["NCCL_DEBUG"] = "OFF"
24+
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "OFF"
25+
os.environ["GLOG_minloglevel"] = "2"
26+
os.environ["MASTER_ADDR"] = os.getenv("MASTER_ADDR", "localhost")
27+
os.environ["MASTER_PORT"] = os.getenv("MASTER_PORT", "12355")
28+
29+
backend = "nccl"
30+
31+
def init_process(rank, world_size, fn, backend=backend):
32+
try:
33+
dist.init_process_group(backend, rank=rank, world_size=world_size)
34+
fn(rank, world_size)
35+
except Exception as e:
36+
traceback.print_exc()
37+
if dist.is_initialized():
38+
dist.destroy_process_group()
39+
raise e
40+
41+
def run_mcmc(rank, world_size):
42+
try:
43+
if rank != 0:
44+
sys.stderr = open(os.devnull, "w")
45+
sys.stdout = open(os.devnull, "w")
46+
torch.manual_seed(42 + rank)
47+
48+
def unit_circle_integrand(x, f):
49+
f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double()
50+
return f[:, 0]
51+
52+
53+
def half_sphere_integrand(x, f):
54+
f[:, 0] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2
55+
return f[:, 0]
56+
57+
dim = 2
58+
bounds = [(-1, 1), (-1, 1)]
59+
n_eval = 6400000
60+
batch_size = 40000
61+
n_therm = 20
62+
63+
device = torch.device(f"cuda:{rank}")
64+
65+
vegas_map = Vegas(dim, device=device, ninc=10)
66+
67+
# Monte Carlo and MCMC for Unit Circle
68+
mc_integrator = MonteCarlo(
69+
f=unit_circle_integrand, bounds=bounds, batch_size=batch_size,
70+
device=device
71+
)
72+
mcmc_integrator = MarkovChainMonteCarlo(
73+
f=unit_circle_integrand, bounds=bounds, batch_size=batch_size, nburnin=n_therm,
74+
device=device
75+
)
76+
77+
print("Unit Circle Integration Results:")
78+
print("Plain MC:", mc_integrator(n_eval))
79+
print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5))
80+
81+
# Train VEGAS map for Unit Circle
82+
vegas_map.adaptive_training(batch_size, unit_circle_integrand, alpha=0.5)
83+
vegas_integrator = MonteCarlo(
84+
bounds, f=unit_circle_integrand, maps=vegas_map, batch_size=batch_size,
85+
device=device
86+
)
87+
vegasmcmc_integrator = MarkovChainMonteCarlo(
88+
bounds,
89+
f=unit_circle_integrand,
90+
maps=vegas_map,
91+
batch_size=batch_size,
92+
nburnin=n_therm,
93+
device=device
94+
)
95+
96+
print("VEGAS:", vegas_integrator(n_eval))
97+
print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5))
98+
99+
# Monte Carlo and MCMC for Half-Sphere
100+
mc_integrator.f = half_sphere_integrand
101+
mcmc_integrator.f = half_sphere_integrand
102+
103+
print("\nHalf-Sphere Integration Results:")
104+
print("Plain MC:", mc_integrator(n_eval))
105+
print("MCMC:", mcmc_integrator(n_eval, mix_rate=0.5))
106+
107+
vegas_map.make_uniform()
108+
vegas_map.adaptive_training(batch_size, half_sphere_integrand, epoch=10, alpha=0.5)
109+
vegas_integrator.f = half_sphere_integrand
110+
vegasmcmc_integrator.f = half_sphere_integrand
111+
112+
print("VEGAS:", vegas_integrator(n_eval))
113+
print("VEGAS-MCMC:", vegasmcmc_integrator(n_eval, mix_rate=0.5))
114+
115+
116+
except Exception as e:
117+
print(f"Error in run_mcmc for rank {rank}: {e}")
118+
traceback.print_exc()
119+
raise e
120+
finally:
121+
if dist.is_initialized():
122+
dist.destroy_process_group()
123+
124+
125+
def test_mcmc(world_size):
126+
world_size = min(world_size, mp.cpu_count())
127+
try:
128+
mp.spawn(
129+
init_process,
130+
args=(world_size, run_mcmc),
131+
nprocs=world_size,
132+
join=True,
133+
daemon=False,
134+
)
135+
except Exception as e:
136+
print(f"Error in test_mcmc: {e}")
137+
138+
if __name__ == "__main__":
139+
mp.set_start_method("spawn", force=True)
140+
test_mcmc(4)

0 commit comments

Comments
 (0)