diff --git a/haiopy/buffers.py b/haiopy/buffers.py index 41aa771..72ae2f3 100644 --- a/haiopy/buffers.py +++ b/haiopy/buffers.py @@ -4,7 +4,7 @@ from threading import Event from scipy import signal import warnings - +import matplotlib.pyplot as plt class _Buffer(ABC): """Abstract base class for audio buffers for block-wise iteration. @@ -538,3 +538,205 @@ def next(self): def _reset(self): super()._reset() + +class LinearSweepGenerator(_Buffer): + """ + Generator to block wise calculate a linear sweep as described in [#]_. + + Examples: + -------- + + >>> from haiopy.buffers import LinearSweepGenerator + >>> import matplotlib.pyplot as plt + >>> sine = LinearSweepGenerator(440, 128) + >>> blocks = [next(LinearSweepGenerator), next(LinearSweepGenerator), next(LinearSweepGenerator)] + >>> for block in blocks: + >>> plt.plot((block)) + >>> plt.show() + + Source: + .. [#] Farina, Angelo (2000): "Simultaneous measurement of impulse + response and distortion with a swept-sine technique." 108th AES + Convention, Paris: France. + """ + + def __init__(self, + block_size, + amplitude=1, + sweep_duration = 10, + f_1 = 0, + f_2 = 22050, + sampling_rate=44100) -> None: + """ + Initialize a `LinearSweepGenerator`with a given block_size, + amplitude, sweep duration, lower frequency, upper frequency + and samplingrate. + + Parameters + ---------- + block_size : int + The block size in samples. + amplitude: double, optional + The amplitude of the sine. The default is ``1``. + sweep_duration : float, optional + The duration of one sweep in seconds. The default is ``10`` s + f_1 : int, optional + The starting frequency for the sweep in Hz. The default is + ``0`` Hz. + f_2: int, optional + The ending frequency for the sweep in Hz. The default is + ``22050`` Hz. + sampling_rate : int, optional + The sampling rate in Hz. The default is ``44100``. + """ + super().__init__(block_size) + self._amplitude = amplitude + self._sampling_rate = sampling_rate + self._T = sweep_duration + self._f_1 = f_1 + + if f_2 > sampling_rate/2: + self.f_2 = np.floor(sampling_rate/2) + else: + self._f_2 = f_2 + + self._t_start = 0 + self._in_transition = False + + @property + def amplitude(self): + """Return the amplitude of the sweep.""" + return self._amplitude + + @amplitude.setter + def amplitude(self, amplitude): + self.check_if_active() + self._amplitude = amplitude + self._reset() + + @property + def sampling_rate(self): + """Return the sampling rate of the generated sweep.""" + return self._sampling_rate + + @sampling_rate.setter + def sampling_rate(self, sampling_rate): + """Set the sampling rate.""" + self.check_if_active() + self._sampling_rate = sampling_rate + self._phase = 0 + + @property + def n_channels(self): + """Return the number of channels. This is currently always 1.""" + return 1 + + @property + def sweep_duration(self): + """Return the duration for one sweep.""" + return self._T + + @sweep_duration.setter + def sweep_duration(self, sweep_duration): + """Set the duration of one sweep.""" + self.check_if_active() + self._sweep_duration = sweep_duration + self._T = sweep_duration + + @property + def T(self): + """Return the period time T for one sweep.""" + return self._T + + @property + def f_1(self): + """Return lower freuqency limit for sweep.""" + return self._f_1 + + @f_1.setter + def f_1(self, f_1): + """Set the lower frequency of the sweep.""" + self.check_if_active() + self._f_1 = f_1 + self._reset() + + @property + def f_2(self): + """Return upper freuqency limit for sweep.""" + return self._f_2 + + @f_2.setter + def f_2(self, f_2): + """Set the upper frequency of the sweep.""" + self.check_if_active() + self._f_2 = f_2 + self._reset() + + @property + def t_start(self): + """Return current time.""" + return self._t_start + + def _set_block_size(self, block_size): + """Set blocksize.""" + self.check_if_active() + super()._set_block_size(block_size) + self._reset() + + def next(self): + """ + Return the next audio block as numpy array and increase the current + time variable by one block. + If sweep ends inside a block, it fades out and a new sweep begins. + """ + + # if in transition to a new sweep-start (i.e. sweep ends within + # a signal block, not directly at the end) + if self._in_transition: + time_till_end = self._T - self._t_start + n_samples_1 = int(time_till_end * self._sampling_rate) + t_1 = np.arange(n_samples_1) + self._t_start + self._reset() + n_samples_2 = self._block_size - n_samples_1 + t_2 = np.arange(n_samples_2) / self._sampling_rate + t = np.concatenate((t_1, t_2)) + else: # usual iteration + n_samples = int(self._block_size) + t = np.arange(n_samples) / self._sampling_rate + self._t_start + + # [1, page 5] + w_1 = 2 * np.pi * self._f_1 + w_2 = 2 * np.pi * self._f_2 + data = (self._amplitude * + np.sin(w_1 * t + (w_2-w_1) / self._T * t**2 / 2)) + + self._t_start += self._block_size / self._sampling_rate + + # window end of first sweep to reach zero at the end: + # ...and set t_start new: + if self._in_transition: + win_len = 11 + win = np.hanning(win_len) + win_half = win[-int(win_len/2):] + idx = n_samples_1-len(win_half) + data[idx:n_samples_1] = data[idx:n_samples_1]*win_half + + # set start time based on t_2: + # in next iterations t_start can be increased by block/fs again as + # usual + self._t_start = t_2[-1] + 1/self._sampling_rate + self._in_transition = False + + # if we reach desired duration of one sweep, do....: + if (self._t_start >= + (self._T - (self._block_size / self._sampling_rate))): + if self._t_start == self._T: + self._reset() + else: + self._in_transition = True + + return data + + def _reset(self): + """Reset sweep.""" + self._t_start = 0 \ No newline at end of file diff --git a/tests/test_buffers.py b/tests/test_buffers.py index 452fa82..4662c54 100644 --- a/tests/test_buffers.py +++ b/tests/test_buffers.py @@ -6,7 +6,7 @@ SignalBuffer, EmptyBuffer ) -from haiopy.buffers import SineGenerator, NoiseGenerator +from haiopy.buffers import SineGenerator, NoiseGenerator, LinearSweepGenerator import pytest import pyfar as pf from scipy import signal @@ -260,7 +260,6 @@ def test_SineGenerator(): # check if sine generator is active now assert sine.is_active is True - def test_SineGenerator_updates(): frequency = 440 block_size = 512 @@ -434,6 +433,58 @@ def test_NoiseGenerator_updates(): with pytest.raises(BufferError, match="needs to be inactive"): noise.seed = 123 +def test_LinearSweepGenerator(): + block_size = 512 + amplitude = 0.1 + sampling_rate = 44100 + f1 = 0 + f2 = 20000 + sweep_duration = 2 + + sweep = LinearSweepGenerator(block_size, + amplitude, + sweep_duration = sweep_duration, + f_1 = f1, + f_2 = f2, + sampling_rate=sampling_rate) + + # test getters with default + assert sweep.block_size == block_size + assert sweep.sampling_rate == 44100 + assert sweep.amplitude == amplitude + assert sweep.f_1 == f1 + assert sweep.f_2 == f2 + assert sweep.sweep_duration == sweep_duration + + assert sweep.n_channels == 1 + + # check if sweep generator is not active yet + assert sweep.is_active is False + + # check first block + t_start = 0 + n_samples = int(block_size) + t = np.arange(n_samples) / sampling_rate + t_start + w_1 = 2 * np.pi * f1 + w_2 = 2 * np.pi * f2 + sweep_data = (amplitude * + np.sin(w_1 * t + (w_2-w_1) / sweep_duration * t**2 / 2)) + block_data = next(sweep) + npt.assert_array_equal(block_data, sweep_data) + + # check if noise generator is active now + assert sweep.is_active is True + + # check second block + t_start += block_size / sampling_rate + t = np.arange(n_samples) / sampling_rate + t_start + w_1 = 2 * np.pi * f1 + w_2 = 2 * np.pi * f2 + sweep_data = (amplitude * + np.sin(w_1 * t + (w_2-w_1) / sweep_duration * t**2 / 2)) + + block_data = next(sweep) + npt.assert_array_equal(block_data, sweep_data) def test_sampling_rate_setter(): # Test setting the sampling rate, resampling the Signal and updating data