Skip to content

Update unittests #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions MCintegration/integrators.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def __call__(self, neval, nblock=32, verbose=-1, **kwargs):

if verbose > 0:
print(
f"nblock = {nblock}, n_steps_perblock = {epoch_perblock}, batch_size = {self.batch_size}, actual neval = {self.batch_size*epoch_perblock*nblock}"
f"nblock = {nblock}, n_steps_perblock = {epoch_perblock}, batch_size = {self.batch_size}, actual neval = {self.batch_size * epoch_perblock * nblock}"
)

config = Configuration(
Expand Down Expand Up @@ -356,13 +356,13 @@ def __call__(
else:
nblock = epoch // nsteps_perblock
n_meas_perblock = nsteps_perblock // meas_freq
assert (
n_meas_perblock > 0
), f"neval ({neval}) should be larger than batch_size * nblock * meas_freq ({self.batch_size} * {nblock} * {meas_freq})"
assert n_meas_perblock > 0, (
f"neval ({neval}) should be larger than batch_size * nblock * meas_freq ({self.batch_size} * {nblock} * {meas_freq})"
)

if verbose > 0:
print(
f"nblock = {nblock}, n_meas_perblock = {n_meas_perblock}, meas_freq = {meas_freq}, batch_size = {self.batch_size}, actual neval = {self.batch_size*nsteps_perblock*nblock}"
f"nblock = {nblock}, n_meas_perblock = {n_meas_perblock}, meas_freq = {meas_freq}, batch_size = {self.batch_size}, actual neval = {self.batch_size * nsteps_perblock * nblock}"
)

config = Configuration(
Expand Down
228 changes: 187 additions & 41 deletions MCintegration/integrators_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,84 @@
import unittest
from unittest.mock import patch, MagicMock
import os
import torch
import numpy as np
from integrators import Integrator, MonteCarlo, MarkovChainMonteCarlo
from integrators import get_ip, get_open_port, setup
from integrators import gaussian, random_walk

# from base import LinearMap, Uniform
from maps import Configuration
from base import EPSILON


class TestIntegrators(unittest.TestCase):
@patch("socket.socket")
def test_get_ip(self, mock_socket):
# Mock the socket behavior
mock_socket_instance = mock_socket.return_value
mock_socket_instance.getsockname.return_value = ("192.168.1.1", 12345)
ip = get_ip()
self.assertEqual(ip, "192.168.1.1")

@patch("socket.socket")
def test_get_open_port(self, mock_socket):
# Mock the socket behavior
mock_socket_instance = mock_socket.return_value.__enter__.return_value
mock_socket_instance.getsockname.return_value = ("0.0.0.0", 54321)
port = get_open_port()
self.assertEqual(port, 54321)

@patch("torch.distributed.init_process_group")
@patch("torch.cuda.set_device")
@patch.dict(os.environ, {"LOCAL_RANK": "0"})
def test_setup(self, mock_set_device, mock_init_process_group):
setup(backend="gloo")
mock_init_process_group.assert_called_once_with(backend="gloo")
mock_set_device.assert_called_once_with(0)

def test_random_walk(self):
# Test random_walk function with default parameters
dim = 3
device = "cpu"
dtype = torch.float32
u = torch.rand(dim, device=device, dtype=dtype)
step_size = 0.2

new_u = random_walk(dim, device, dtype, u, step_size=step_size)

# Validate output shape and range
self.assertEqual(new_u.shape, u.shape)
self.assertTrue(torch.all(new_u >= 0) and torch.all(new_u <= 1))

# Test with custom step size
custom_step_size = 0.5
new_u_custom = random_walk(dim, device, dtype, u, step_size=custom_step_size)
self.assertEqual(new_u_custom.shape, u.shape)
self.assertTrue(torch.all(new_u_custom >= 0) and torch.all(new_u_custom <= 1))

def test_gaussian(self):
# Test gaussian function with default parameters
dim = 3
device = "cpu"
dtype = torch.float32
u = torch.rand(dim, device=device, dtype=dtype)

mean = torch.zeros_like(u)
std = torch.ones_like(u)

new_u = gaussian(dim, device, dtype, u, mean=mean, std=std)

# Validate output shape
self.assertEqual(new_u.shape, u.shape)

# Test with custom mean and std
custom_mean = torch.full_like(u, 0.5)
custom_std = torch.full_like(u, 0.1)
new_u_custom = gaussian(dim, device, dtype, u, mean=custom_mean, std=custom_std)

self.assertEqual(new_u_custom.shape, u.shape)
self.assertTrue(torch.all(new_u_custom > custom_mean - 3 * custom_std))
self.assertTrue(torch.all(new_u_custom < custom_mean + 3 * custom_std))


class TestIntegrator(unittest.TestCase):
Expand All @@ -25,6 +99,46 @@
self.assertTrue(hasattr(integrator.maps, "device"))
self.assertTrue(hasattr(integrator.maps, "dtype"))

@patch("MCintegration.maps.CompositeMap")
@patch("MCintegration.base.LinearMap")
def test_initialization_with_maps(self, mock_linear_map, mock_composite_map):
# Mock the LinearMap and CompositeMap
mock_linear_map_instance = MagicMock()
mock_linear_map.return_value = mock_linear_map_instance
mock_composite_map_instance = MagicMock()
mock_composite_map.return_value = mock_composite_map_instance

# Create a mock map
mock_map = MagicMock()
mock_map.device = "cpu"
mock_map.dtype = torch.float32
mock_map.forward_with_detJ.return_value = (torch.rand(10, 2), torch.rand(10))

# Initialize Integrator with maps
integrator = Integrator(
bounds=self.bounds, f=self.f, maps=mock_map, batch_size=self.batch_size
)

# Assertions
self.assertEqual(integrator.dim, 2)
self.assertEqual(integrator.batch_size, 1000)
self.assertEqual(integrator.f_dim, 1)
self.assertTrue(hasattr(integrator.maps, "forward_with_detJ"))
self.assertTrue(hasattr(integrator.maps, "device"))
self.assertTrue(hasattr(integrator.maps, "dtype"))

integrator = Integrator(
bounds=self.bounds,
f=self.f,
maps=mock_map,
batch_size=self.batch_size,
device="cpu",
)

# Assertions
self.assertEqual(integrator.device, "cpu")
self.assertTrue(hasattr(integrator.maps, "device"))

def test_bounds_conversion(self):
# Test various input types
test_cases = [
Expand Down Expand Up @@ -69,11 +183,11 @@
with self.assertRaises(error_type):
Integrator(bounds=bounds, f=self.f)

def test_device_handling(self):
if torch.cuda.is_available():
integrator = Integrator(bounds=self.bounds, f=self.f, device="cuda")
self.assertTrue(integrator.bounds.is_cuda)
self.assertTrue(integrator.maps.device == "cuda")
# def test_device_handling(self):
# if torch.cuda.is_available():
# integrator = Integrator(bounds=self.bounds, f=self.f, device="cuda")
# self.assertTrue(integrator.bounds.is_cuda)
# self.assertTrue(integrator.maps.device == "cuda")

def test_dtype_handling(self):
dtypes = [torch.float32, torch.float64]
Expand Down Expand Up @@ -167,6 +281,27 @@
# Should not raise warning
self.mc(neval=neval, nblock=nblock)

def test_block_size_warning(self):
mc = MonteCarlo(bounds=self.bounds, f=self.simple_integral, batch_size=1000)
with self.assertWarns(UserWarning):
mc(neval=500, nblock=10) # neval too small for nblock

def test_varying_nblock(self):
test_cases = [
(10000, 10), # Standard case
(10000, 1), # Single block
(10000, 100), # Many blocks
]

for neval, nblock in test_cases:
with self.subTest(neval=neval, nblock=nblock):
result = self.mc(neval=neval, nblock=nblock)
if hasattr(result, "mean"):
value = result.mean
else:
value = result

Check warning on line 302 in MCintegration/integrators_test.py

View check run for this annotation

Codecov / codecov/patch

MCintegration/integrators_test.py#L302

Added line #L302 was not covered by tests
self.assertAlmostEqual(float(value), 1.0, delta=0.1)


class TestMarkovChainMonteCarlo(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -237,29 +372,40 @@
value = result
self.assertAlmostEqual(float(value), 1.0, delta=tolerance)

# def test_mix_rate_sensitivity(self):
# # Modified mix rate test to be more robust
# mix_rates = [0.0, 0.5, 1.0]
# results = []
def test_sample_acceptance(self):
config = Configuration(
self.mcmc.batch_size,
self.mcmc.dim,
self.mcmc.f_dim,
self.mcmc.device,
self.mcmc.dtype,
)
config.u, config.detJ = self.mcmc.q0.sample_with_detJ(self.mcmc.batch_size)
config.x, detj = self.mcmc.maps.forward_with_detJ(config.u)
config.detJ *= detj
config.weight = torch.rand(self.mcmc.batch_size, device=self.mcmc.device)

# for mix_rate in mix_rates:
# accumulated_error = 0
# n_trials = 3 # Run multiple trials for each mix_rate
self.mcmc.sample(config, nsteps=1, mix_rate=0.5)

# for _ in range(n_trials):
# result = self.mcmc(neval=50000, mix_rate=mix_rate, nblock=10)
# if hasattr(result, "mean"):
# value = result.mean
# error = result.sdev
# else:
# value = result
# error = abs(float(value) - 1.0)
# accumulated_error += error
# Validate acceptance logic
self.assertTrue(torch.all(config.weight >= EPSILON))
self.assertEqual(config.u.shape, config.x.shape)

# results.append(accumulated_error / n_trials)
def test_varying_mix_rate(self):
test_cases = [
(0.1, 0.2), # Low mix rate
(0.5, 0.1), # Medium mix rate
(0.9, 0.05), # High mix rate
]

# # We expect moderate mix rates to have lower average error
# self.assertLess(results[1], max(results[0], results[2]))
for mix_rate, tolerance in test_cases:
with self.subTest(mix_rate=mix_rate):
result = self.mcmc(neval=50000, mix_rate=mix_rate, nblock=10)
if hasattr(result, "mean"):
value = result.mean
else:
value = result

Check warning on line 407 in MCintegration/integrators_test.py

View check run for this annotation

Codecov / codecov/patch

MCintegration/integrators_test.py#L407

Added line #L407 was not covered by tests
self.assertAlmostEqual(float(value), 1.0, delta=tolerance)


class TestDistributedFunctionality(unittest.TestCase):
Expand All @@ -271,26 +417,26 @@
self.assertEqual(integrator.rank, 0)
self.assertEqual(integrator.world_size, 1)

@unittest.skipIf(not torch.distributed.is_available(), "Distributed not available")
def test_multi_gpu_consistency(self):
if torch.cuda.device_count() >= 2:
bounds = torch.tensor([[0.0, 1.0]], dtype=torch.float64)
f = lambda x, fx: torch.ones_like(x)
# @unittest.skipIf(not torch.distributed.is_available(), "Distributed not available")
# def test_multi_gpu_consistency(self):
# if torch.cuda.device_count() >= 2:
# bounds = torch.tensor([[0.0, 1.0]], dtype=torch.float64)
# f = lambda x, fx: torch.ones_like(x)

# Create two integrators on different devices
integrator1 = Integrator(bounds=bounds, f=f, device="cuda:0")
integrator2 = Integrator(bounds=bounds, f=f, device="cuda:1")
# # Create two integrators on different devices
# integrator1 = Integrator(bounds=bounds, f=f, device="cuda:0")
# integrator2 = Integrator(bounds=bounds, f=f, device="cuda:1")

# Results should be consistent across devices
result1 = integrator1(neval=10000)
result2 = integrator2(neval=10000)
# # Results should be consistent across devices
# result1 = integrator1(neval=10000)
# result2 = integrator2(neval=10000)

if hasattr(result1, "mean"):
value1, value2 = result1.mean, result2.mean
else:
value1, value2 = result1, result2
# if hasattr(result1, "mean"):
# value1, value2 = result1.mean, result2.mean
# else:
# value1, value2 = result1, result2

self.assertAlmostEqual(float(value1), float(value2), places=1)
# self.assertAlmostEqual(float(value1), float(value2), places=1)


if __name__ == "__main__":
Expand Down
Loading