diff --git a/Makefile b/Makefile index 1c83953..29625ff 100644 --- a/Makefile +++ b/Makefile @@ -49,11 +49,12 @@ all: lint mypy test .PHONY: clean clean: - rm -rf `find . -name __pycache__` - rm -f `find . -type f -name '*.py[co]' ` - rm -f `find . -type f -name '*~' ` - rm -f `find . -type f -name '.*~' ` - rm -f `find . -type f -name '*.cpython-*' ` + # TODO -> this kinda wipes a lot of stuff in the python environment, maybe we should be more specific + rm -rf `find . -name __pycache__ -not -path "./.venv/*"` + rm -f `find . -type f -name '*.py[co]' -not -path "./.venv/*"` + rm -f `find . -type f -name '*~' -not -path "./.venv/*"` + rm -f `find . -type f -name '.*~' -not -path "./.venv/*"` + rm -f `find . -type f -name '*.cpython-*' -not -path "./.venv/*"` rm -rf dist rm -rf build rm -rf target diff --git a/downsample_rs/Cargo.toml b/downsample_rs/Cargo.toml index 107469b..0b3b117 100644 --- a/downsample_rs/Cargo.toml +++ b/downsample_rs/Cargo.toml @@ -9,7 +9,9 @@ license = "MIT" [dependencies] # TODO: perhaps use polars? argminmax = { version = "0.6.1", features = ["half"] } -half = { version = "2.3.1", default-features = false , features=["num-traits"], optional = true} +half = { version = "2.3.1", default-features = false, features = [ + "num-traits", +], optional = true } num-traits = { version = "0.2.17", default-features = false } once_cell = "1" rayon = { version = "1.8.0", default-features = false } @@ -35,3 +37,7 @@ harness = false [[bench]] name = "bench_minmaxlttb" harness = false + +[[bench]] +name = "bench_fpcs" +harness = false diff --git a/downsample_rs/benches/bench_fpcs.rs b/downsample_rs/benches/bench_fpcs.rs new file mode 100644 index 0000000..6940c87 --- /dev/null +++ b/downsample_rs/benches/bench_fpcs.rs @@ -0,0 +1,92 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use dev_utils::{config, utils}; +use downsample_rs::fpcs as fpcs_mod; + +fn fpcs_f32_random_array_long(c: &mut Criterion) { + let n = config::ARRAY_LENGTH_LONG; + let y = utils::get_random_array::(n, f32::MIN, f32::MAX); + let x = (0..n).map(|i| i as i32).collect::>(); + + // Non parallel + // --- without x + c.bench_function("fpcs_vanilla_f32", |b| { + b.iter(|| fpcs_mod::vanilla_fpcs_without_x(black_box(y.as_slice()), black_box(2_000))) + }); + c.bench_function("fpcs_mm_f32", |b| { + b.iter(|| fpcs_mod::fpcs_without_x(black_box(y.as_slice()), black_box(2_000))) + }); + // --- with x + c.bench_function("fpcs_mm_f32_x", |b| { + b.iter(|| { + fpcs_mod::fpcs_with_x( + black_box(x.as_slice()), + black_box(y.as_slice()), + black_box(2_000), + ) + }) + }); + + // Parallel + // --- without x + c.bench_function("fpcs_mm_f32_par", |b| { + b.iter(|| fpcs_mod::fpcs_without_x_parallel(black_box(y.as_slice()), black_box(2_000))) + }); + // --- with x + c.bench_function("fpcs_mm_f32_x_par", |b| { + b.iter(|| { + fpcs_mod::fpcs_with_x_parallel( + black_box(x.as_slice()), + black_box(y.as_slice()), + black_box(2_000), + ) + }) + }); +} + +fn fpcs_f32_random_array_50m(c: &mut Criterion) { + let n = 50_000_000; + let y = utils::get_random_array::(n, f32::MIN, f32::MAX); + let x = (0..n).map(|i| i as i32).collect::>(); + + // Non parallel + // --- without x + c.bench_function("fpcs_vanilla_50M_f32", |b| { + b.iter(|| fpcs_mod::vanilla_fpcs_without_x(black_box(y.as_slice()), black_box(2_000))) + }); + c.bench_function("fpcs_mm_50M_f32", |b| { + b.iter(|| fpcs_mod::fpcs_without_x(black_box(y.as_slice()), black_box(2_000))) + }); + // --- with x + c.bench_function("fpcs_mm_50M_f32_x", |b| { + b.iter(|| { + fpcs_mod::fpcs_with_x( + black_box(x.as_slice()), + black_box(y.as_slice()), + black_box(2_000), + ) + }) + }); + // Parallel + // --- without x + c.bench_function("fpcs_mm_50M_f32_par", |b| { + b.iter(|| fpcs_mod::fpcs_without_x_parallel(black_box(y.as_slice()), black_box(2_000))) + }); + // --- with x + c.bench_function("fpcs_mm_50M_f32_x_par", |b| { + b.iter(|| { + fpcs_mod::fpcs_with_x_parallel( + black_box(x.as_slice()), + black_box(y.as_slice()), + black_box(2_000), + ) + }) + }); +} + +criterion_group!( + benches, + // + fpcs_f32_random_array_long, + fpcs_f32_random_array_50m, +); +criterion_main!(benches); diff --git a/downsample_rs/src/fpcs.rs b/downsample_rs/src/fpcs.rs new file mode 100644 index 0000000..2ea4c37 --- /dev/null +++ b/downsample_rs/src/fpcs.rs @@ -0,0 +1,362 @@ +use super::minmax; +use super::types::Num; +use argminmax::{ArgMinMax, NaNArgMinMax}; +use num_traits::{AsPrimitive, FromPrimitive}; +use std::ops::Not; + +// ------------------------------- Helper datastructures ------------------------------- +// NOTE: the pub(crate) is added to match the visibility of the fpcs_generic function +pub(crate) struct Point { + x: usize, + y: Ty, +} + +impl Point { + fn update(&mut self, x: usize, y: Ty) { + self.x = x; + self.y = y; + } +} + +#[derive(PartialEq, Eq)] +enum Flag { + Min, + Max, + None, +} + +// --------------------------------- helper functions ---------------------------------- +#[inline(always)] +fn fpcs_inner_core( + min_point: &mut Point, + max_point: &mut Point, + potential_point: &mut Point, + previous_min_flag: &mut Flag, + sampled_indices: &mut Vec, +) { + // if the min is to the left of the max + if min_point.x < max_point.x { + // if the min was selected in the previos bin + if *previous_min_flag == Flag::Min && min_point.x != potential_point.x { + sampled_indices.push(potential_point.x); + } + sampled_indices.push(min_point.x); + + potential_point.update(max_point.x, max_point.y); + min_point.update(max_point.x, max_point.y); + *previous_min_flag = Flag::Min; + } else { + if *previous_min_flag == Flag::Max && max_point.x != potential_point.x { + sampled_indices.push(potential_point.x as usize); + } + sampled_indices.push(max_point.x as usize); + + potential_point.update(min_point.x, min_point.y); + max_point.update(min_point.x, min_point.y); + *previous_min_flag = Flag::Max; + } +} + +#[inline(always)] +fn fpcs_inner_comp( + y: &[Ty], + min_idx: usize, + max_idx: usize, + min_point: &mut Point, + max_point: &mut Point, +) { + // NOTE: the > and >= stem from the pseudocode details of the FPCS algorithm + // NOTE: this comparison is inverted as nan comparisons always return false + if (max_point.y > y[max_idx]).not() { + max_point.update(max_idx, y[max_idx]); + } + if (min_point.y <= y[min_idx]).not() { + min_point.update(min_idx, y[min_idx]); + } +} + +#[inline(always)] +fn fpcs_outer_loop( + arr: &[Ty], + minmax_idxs: Vec, + n_out: usize, + inner_comp_fn: fn(&[Ty], usize, usize, &mut Point, &mut Point), +) -> Vec { + let mut previous_min_flag: Flag = Flag::None; + let mut potential_point: Point = Point { x: 0, y: arr[0] }; + let mut max_point: Point = Point { x: 0, y: arr[0] }; + let mut min_point: Point = Point { x: 0, y: arr[0] }; + + let mut sampled_indices: Vec = Vec::with_capacity(n_out * 2); + // Prepend first point + sampled_indices.push(0); + + // Process the min/max pairs + for chunk in minmax_idxs.chunks(2) { + if chunk.len() == 2 { + let mut min_idx = chunk[0]; + let mut max_idx = chunk[1]; + + // NOTE: the minmax_idxs are not ordered!! + // So we need to check if the min is actually the min + if arr[min_idx] > arr[max_idx] { + min_idx = chunk[1]; + max_idx = chunk[0]; + } + + inner_comp_fn(arr, min_idx, max_idx, &mut min_point, &mut max_point); + + fpcs_inner_core( + &mut min_point, + &mut max_point, + &mut potential_point, + &mut previous_min_flag, + &mut sampled_indices, + ); + } else if chunk.len() == 1 { + // Handle any remaining single element + sampled_indices.push(chunk[0]); + } + } + + // add the last sample + sampled_indices.push(arr.len() as usize - 1); + sampled_indices +} + +// -------------------------------------- MACROs --------------------------------------- +// ----------------------------------- NON-PARALLEL ------------------------------------ + +// ----------- WITH X + +macro_rules! fpcs_with_x { + ($func_name:ident, $trait:ident, $f_minmax:expr, $f_fpcs_inner_comp:expr) => { + pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + FromPrimitive + AsPrimitive, + Ty: Copy + PartialOrd, + { + fpcs_generic(x, y, n_out, $f_minmax, $f_fpcs_inner_comp) + } + }; +} + +fpcs_with_x!( + fpcs_with_x, + ArgMinMax, + minmax::min_max_with_x, + fpcs_inner_comp +); +fpcs_with_x!( + fpcs_with_x_nan, + NaNArgMinMax, + minmax::min_max_with_x_nan, + fpcs_inner_comp +); + +// ----------- WITHOUT X + +macro_rules! fpcs_without_x { + ($func_name:ident, $trait:path, $f_minmax:expr, $f_fpcs_inner_comp:expr) => { + pub fn $func_name(arr: &[T], n_out: usize) -> Vec + where + for<'a> &'a [T]: $trait, + { + fpcs_generic_without_x(arr, n_out, $f_minmax, $f_fpcs_inner_comp) + } + }; +} + +fpcs_without_x!( + fpcs_without_x, + ArgMinMax, + minmax::min_max_without_x, + fpcs_inner_comp +); +fpcs_without_x!( + fpcs_without_x_nan, + NaNArgMinMax, + minmax::min_max_without_x_nan, + fpcs_inner_comp +); + +// ------------------------------------- PARALLEL -------------------------------------- + +// ----------- WITH X + +macro_rules! fpcs_with_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr, $f_fpcs_inner_comp:expr) => { + pub fn $func_name(x: &[Tx], y: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, + Ty: Num + Copy + PartialOrd + Send + Sync, + { + // collect the parrallel iterator to a vector + fpcs_generic(x, y, n_out, $f_argminmax, $f_fpcs_inner_comp) + } + }; +} + +fpcs_with_x_parallel!( + fpcs_with_x_parallel, + ArgMinMax, + minmax::min_max_with_x_parallel, + fpcs_inner_comp +); +fpcs_with_x_parallel!( + fpcs_with_x_parallel_nan, + NaNArgMinMax, + minmax::min_max_with_x_parallel_nan, + fpcs_inner_comp +); + +// ----------- WITHOUT X + +macro_rules! fpcs_without_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr, $f_fpcs_inner_comp:expr) => { + pub fn $func_name( + arr: &[Ty], + n_out: usize, + ) -> Vec + where + for<'a> &'a [Ty]: $trait, + // Ty: Num + AsPrimitive + Send + Sync, + { + fpcs_generic_without_x(arr, n_out, $f_argminmax, $f_fpcs_inner_comp) + } + }; +} + +fpcs_without_x_parallel!( + fpcs_without_x_parallel, + ArgMinMax, + minmax::min_max_without_x_parallel, + fpcs_inner_comp +); +fpcs_without_x_parallel!( + fpcs_without_x_parallel_nan, + NaNArgMinMax, + minmax::min_max_without_x_parallel_nan, + fpcs_inner_comp +); + +// ------------------------------------- GENERICS -------------------------------------- + +// ----------- WITH X + +#[inline(always)] +pub(crate) fn fpcs_generic, Ty: PartialOrd + Copy>( + x: &[Tx], + y: &[Ty], + n_out: usize, + f_minmax: fn(&[Tx], &[Ty], usize) -> Vec, + fpcs_inner_comp: fn(&[Ty], usize, usize, &mut Point, &mut Point), +) -> Vec { + assert_eq!(x.len(), y.len()); + let mut minmax_idxs = f_minmax(&x[1..(x.len() - 1)], &y[1..(x.len() - 1)], (n_out - 2) * 2); + minmax_idxs.iter_mut().for_each(|elem| *elem += 1); // inplace + 1 + return fpcs_outer_loop(y, minmax_idxs, n_out, fpcs_inner_comp); +} + +// ----------- WITHOUT X + +#[inline(always)] +pub(crate) fn fpcs_generic_without_x( + arr: &[Ty], + n_out: usize, + f_minmax: fn(&[Ty], usize) -> Vec, + fpcs_inner_comp: fn(&[Ty], usize, usize, &mut Point, &mut Point), +) -> Vec { + let mut minmax_idxs: Vec = f_minmax(&arr[1..(arr.len() - 1)], (n_out - 2) * 2); + minmax_idxs.iter_mut().for_each(|elem| *elem += 1); // inplace + 1 + return fpcs_outer_loop(arr, minmax_idxs, n_out, fpcs_inner_comp); +} + +// - +// ------------------------------------- LEGACY -------------------------------------- +// - + +pub fn vanilla_fpcs_without_x( + y: &[Ty], + n_out: usize, // NOTE: there will be between [n_out, 2*n_out] output points +) -> Vec { + if n_out >= y.len() { + return (0..y.len()).collect::>(); + } + assert!(n_out >= 3); // avoid division by 0 + + // TODO -> we don't know the size of the output vector + // TODO -> make this more concise in a future version + let mut retain_idxs: Vec = Vec::with_capacity(n_out * 2); + + let mut meta_counter: u32 = 1; + // TODO -> check + let threshold: f64 = y.len() as f64 / (n_out - 2) as f64; + let mut previous_min_flag: Flag = Flag::None; + let mut potential_point: Point = Point { x: 0, y: y[0] }; + let mut max_point: Point = Point { x: 0, y: y[0] }; + let mut min_point: Point = Point { x: 0, y: y[0] }; + + for idx in 1..y.len() { + let val = y[idx]; + + if val > max_point.y { + max_point.update(idx, val); + } + if val < min_point.y { + min_point.update(idx, val); + } + + if (idx > ((meta_counter as f64) * threshold) as usize) || (idx == y.len() - 1) { + // edge case -> previous flag is None + meta_counter += 1; + + // if the min is to the left of the max + if min_point.x < max_point.x { + // if the min was selected in the previos bin + if previous_min_flag == Flag::Min && min_point.x != potential_point.x { + retain_idxs.push(potential_point.x as usize); + } + retain_idxs.push(min_point.x as usize); + + potential_point.update(max_point.x, max_point.y); + min_point.update(max_point.x, max_point.y); + previous_min_flag = Flag::Min; + } else { + if previous_min_flag == Flag::Max && max_point.x != potential_point.x { + retain_idxs.push(potential_point.x as usize); + } + retain_idxs.push(max_point.x as usize); + + potential_point.update(min_point.x, min_point.y); + max_point.update(min_point.x, min_point.y); + previous_min_flag = Flag::Max; + } + } + } + + // add the last sample + retain_idxs.push(y.len() as usize - 1); + return retain_idxs; +} + +// --------------------------------------- TESTS --------------------------------------- +#[cfg(test)] +mod tests { + use dev_utils::utils; + + use super::fpcs_without_x; + + #[test] + fn test_fpcs_with_x() { + let x = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let y = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let n_out = 3; + // let sampled_indices = fpcs_without_x(&x, &y, n_out); + // TODO -> fix after we have a library + // assert eq(sampled_indices, vec![0, 9]) + // utils::print_vec(&res); + } +} diff --git a/downsample_rs/src/lib.rs b/downsample_rs/src/lib.rs index 409294d..1255bd1 100644 --- a/downsample_rs/src/lib.rs +++ b/downsample_rs/src/lib.rs @@ -11,6 +11,8 @@ pub mod minmaxlttb; pub use minmaxlttb::*; pub mod m4; pub use m4::*; +pub mod fpcs; +pub use fpcs::*; pub(crate) mod helpers; pub(crate) mod searchsorted; pub(crate) mod types; diff --git a/pyproject.toml b/pyproject.toml index 876db6e..83afd94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,10 @@ select = ["E", "F", "I"] line-length = 88 extend-select = ["Q"] ignore = ["E402", "F403"] + +[tool.ruff.per-file-ignores] +"tsdownsample/_python/downsamplers.py" = ["E501"] # NOTE "downsamplers.py" also works + # Formatting [tool.black] diff --git a/src/lib.rs b/src/lib.rs index 9cd30c2..a84fe06 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -423,6 +423,52 @@ fn minmaxlttb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { Ok(()) } +// --------------------------------------- FPCS --------------------------------------- + +use downsample_rs::fpcs as fpcs_mod; + +// Create a sub module for the FPCS algorithm +#[pymodule] +fn fpcs(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { + // ----------------- SEQUENTIAL + + let sequential_mod = PyModule::new_bound(_py, "sequential")?; + + // ----- WITHOUT X + { + create_pyfuncs_without_x!(fpcs_mod, fpcs_without_x, sequential_mod); + create_pyfuncs_without_x!(@nan fpcs_mod, fpcs_without_x_nan, sequential_mod); + } + + // ----- WITH X + { + create_pyfuncs_with_x!(fpcs_mod, fpcs_with_x, sequential_mod); + create_pyfuncs_with_x!(@nan fpcs_mod, fpcs_with_x_nan, sequential_mod); + } + + // ----------------- PARALLEL + + let parallel_mod = PyModule::new_bound(_py, "parallel")?; + + // ----- WITHOUT X + { + create_pyfuncs_without_x!(fpcs_mod, fpcs_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(@nan fpcs_mod, fpcs_without_x_parallel_nan, parallel_mod); + } + + // ----- WITH X + { + create_pyfuncs_with_x!(fpcs_mod, fpcs_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(@nan fpcs_mod, fpcs_with_x_parallel_nan, parallel_mod); + } + + // Add the sub modules to the module + m.add_submodule(&sequential_mod)?; + m.add_submodule(¶llel_mod)?; + + Ok(()) +} + // ------------------------------- DOWNSAMPLING MODULE ------------------------------ // #[pymodule] // The super module @@ -432,6 +478,7 @@ fn tsdownsample(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pymodule!(m4))?; m.add_wrapped(wrap_pymodule!(lttb))?; m.add_wrapped(wrap_pymodule!(minmaxlttb))?; + m.add_wrapped(wrap_pymodule!(fpcs))?; Ok(()) } diff --git a/tests/test_algos_python_compliance.py b/tests/test_algos_python_compliance.py index 8a64163..8749c93 100644 --- a/tests/test_algos_python_compliance.py +++ b/tests/test_algos_python_compliance.py @@ -2,18 +2,26 @@ import pytest from tsdownsample import ( + FPCSDownsampler, LTTBDownsampler, M4Downsampler, MinMaxDownsampler, + MinMaxLTTBDownsampler, + NaNFPCSDownsampler, NaNM4Downsampler, NaNMinMaxDownsampler, + NaNMinMaxLTTBDownsampler, ) from tsdownsample._python.downsamplers import ( + FPCS_py, LTTB_py, M4_py, MinMax_py, + MinMaxLTTB_py, + NaNFPCS_py, NaNM4_py, NaNMinMax_py, + NaNMinMaxLTTB_py, ) @@ -23,9 +31,13 @@ (MinMaxDownsampler(), MinMax_py()), (M4Downsampler(), M4_py()), (LTTBDownsampler(), LTTB_py()), + (MinMaxLTTBDownsampler(), MinMaxLTTB_py()), + (FPCSDownsampler(), FPCS_py()), # Include NaN downsamplers (NaNMinMaxDownsampler(), NaNMinMax_py()), (NaNM4Downsampler(), NaNM4_py()), + (NaNMinMaxLTTBDownsampler(), NaNMinMaxLTTB_py()), + (NaNFPCSDownsampler(), NaNFPCS_py()), ], ) @pytest.mark.parametrize("n", [10_000, 10_032, 20_321, 23_489]) @@ -48,7 +60,12 @@ def test_resampler_accordance(rust_python_pair, n, n_out): @pytest.mark.parametrize( "rust_python_pair", - [(NaNMinMaxDownsampler(), NaNMinMax_py()), (NaNM4Downsampler(), NaNM4_py())], + [ + (NaNMinMaxDownsampler(), NaNMinMax_py()), + (NaNM4Downsampler(), NaNM4_py()), + (NaNMinMaxLTTBDownsampler(), NaNMinMaxLTTB_py()), + (NaNFPCSDownsampler(), NaNFPCS_py()), + ], ) @pytest.mark.parametrize("n", [10_000, 10_032, 20_321, 23_489]) @pytest.mark.parametrize("n_random_nans", [100, 200, 500, 2000, 5000]) @@ -67,3 +84,33 @@ def test_nan_resampler_accordance(rust_python_pair, n, n_random_nans, n_out): rust_downsampler.downsample(x, y, n_out=n_out), python_downsampler.downsample(x, y, n_out=n_out), ) + + +@pytest.mark.parametrize( + "nan_no_nan_pair", + [ + (NaNFPCS_py(), FPCS_py()), + (NaNM4_py(), M4_py()), + (NaNMinMax_py(), MinMax_py()), + (NaNMinMaxLTTB_py(), MinMaxLTTB_py()), + (NaNFPCSDownsampler(), FPCSDownsampler()), + (NaNM4Downsampler(), M4Downsampler()), + (NaNMinMaxDownsampler(), MinMaxDownsampler()), + (NaNMinMaxLTTBDownsampler(), MinMaxLTTBDownsampler()), + ], +) +@pytest.mark.parametrize("n", [10_000, 10_032, 20_321, 23_489]) +@pytest.mark.parametrize("n_out", [100, 200, 252]) +def test_nan_no_nan_resampler_accordance(nan_no_nan_pair, n, n_out): + nan_downsampler, no_nan_downsampler = nan_no_nan_pair + x = np.arange(n) + y = np.random.randn(n) + # Wihtout x passed to the downsamplers + nan_result = nan_downsampler.downsample(y, n_out=n_out) + no_nan_result = no_nan_downsampler.downsample(y, n_out=n_out) + assert np.allclose(nan_result, no_nan_result) + + # With x passed to the downsamplers + nan_result = nan_downsampler.downsample(x, y, n_out=n_out) + no_nan_result = no_nan_downsampler.downsample(x, y, n_out=n_out) + assert np.allclose(nan_result, no_nan_result) diff --git a/tests/test_tsdownsample.py b/tests/test_tsdownsample.py index 993faa6..dcc1c58 100644 --- a/tests/test_tsdownsample.py +++ b/tests/test_tsdownsample.py @@ -6,10 +6,12 @@ from tsdownsample import ( # MeanDownsampler,; MedianDownsampler, EveryNthDownsampler, + FPCSDownsampler, LTTBDownsampler, M4Downsampler, MinMaxDownsampler, MinMaxLTTBDownsampler, + NaNFPCSDownsampler, NaNM4Downsampler, NaNMinMaxDownsampler, NaNMinMaxLTTBDownsampler, @@ -28,12 +30,14 @@ M4Downsampler(), LTTBDownsampler(), MinMaxLTTBDownsampler(), + FPCSDownsampler(), ] RUST_NAN_DOWNSAMPLERS = [ NaNMinMaxDownsampler(), NaNM4Downsampler(), NaNMinMaxLTTBDownsampler(), + NaNFPCSDownsampler(), ] OTHER_DOWNSAMPLERS = [EveryNthDownsampler()] @@ -167,8 +171,12 @@ def test_downsampling_with_gaps_in_x(downsampler: AbstractDownsampler): idx = np.arange(len(arr)) idx[: len(idx) // 2] += len(idx) // 2 # add large gap in x s_downsampled = downsampler.downsample(idx, arr, n_out=100) - assert len(s_downsampled) <= 100 - assert len(s_downsampled) >= 66 + if "FPCS" in downsampler.__class__.__name__: + assert len(s_downsampled) >= 100 + assert len(s_downsampled) <= 200 + else: + assert len(s_downsampled) <= 100 + assert len(s_downsampled) >= 66 @pytest.mark.parametrize("downsampler", generate_rust_downsamplers()) diff --git a/tsdownsample/__init__.py b/tsdownsample/__init__.py index bfed60a..73a17c2 100644 --- a/tsdownsample/__init__.py +++ b/tsdownsample/__init__.py @@ -2,10 +2,12 @@ from .downsamplers import ( EveryNthDownsampler, + FPCSDownsampler, LTTBDownsampler, M4Downsampler, MinMaxDownsampler, MinMaxLTTBDownsampler, + NaNFPCSDownsampler, NaNM4Downsampler, NaNMinMaxDownsampler, NaNMinMaxLTTBDownsampler, @@ -16,10 +18,12 @@ __all__ = [ "EveryNthDownsampler", + "FPCSDownsampler", "MinMaxDownsampler", "M4Downsampler", "LTTBDownsampler", "MinMaxLTTBDownsampler", + "NaNFPCSDownsampler", "NaNMinMaxDownsampler", "NaNM4Downsampler", "NaNMinMaxLTTBDownsampler", diff --git a/tsdownsample/_python/downsamplers.py b/tsdownsample/_python/downsamplers.py index 5343f9f..1e8c68d 100644 --- a/tsdownsample/_python/downsamplers.py +++ b/tsdownsample/_python/downsamplers.py @@ -1,4 +1,6 @@ -from typing import Union +from abc import ABC +from enum import Enum +from typing import NamedTuple, Union import numpy as np @@ -76,8 +78,8 @@ def _downsample( # Construct the output array sampled_x = np.empty(n_out, dtype="int64") + # Add the first point sampled_x[0] = 0 - sampled_x[-1] = x.shape[0] - 1 # Convert x & y to int if it is boolean if x.dtype == np.bool_: @@ -91,7 +93,17 @@ def _downsample( LTTB_py._argmax_area( prev_x=x[a], prev_y=y[a], - avg_next_x=np.mean(x[offset[i + 1] : offset[i + 2]]), + # NOTE: In a 100% correct implementation of LTTB the next x average + # should be implemented as the following: + # avg_next_x=np.mean(x[offset[i + 1] : offset[i + 2]]), + # To improve performance we use the following approximation + # which is the average of the first and last point of the next bucket + # NOTE: this is not as accurate when x is not sampled equidistant + # or when the buckets do not contain tht much data points, but it: + # (1) aligns with visual perception (visual middle) + # (2) is much faster + # (3) is how the LTTB rust implementation works + avg_next_x=(x[offset[i + 1]] + x[offset[i + 2] - 1]) / 2.0, avg_next_y=y[offset[i + 1] : offset[i + 2]].mean(), x_bucket=x[offset[i] : offset[i + 1]], y_bucket=y[offset[i] : offset[i + 1]], @@ -113,6 +125,8 @@ def _downsample( ) + offset[-2] ) + # Always include the last point + sampled_x[-1] = x.shape[0] - 1 return sampled_x @@ -255,3 +269,150 @@ def _downsample( # NOTE: we do not use the np.unique so that all indices are retained return np.array(sorted(rel_idxs)) + + +class _MinMaxLTTB_py(AbstractDownsampler, ABC): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler: AbstractDownsampler = None + self.lttb_downsampler: AbstractDownsampler = None + + def _downsample(self, x, y, n_out, **kwargs): + minmax_ratio = kwargs.get("minmax_ratio", 4) + kwargs.pop("minmax_ratio", None) # remove the minmax_ratio from kwargs + + # Is fine for this implementation as this is only used for testing + if x is None: + x = np.arange(y.shape[0]) + + n_1 = len(x) - 1 + idxs = self.minmax_downsampler.downsample( + x[1:n_1], y[1:n_1], n_out=n_out * minmax_ratio, **kwargs + ) + idxs += 1 + idxs = np.concat(([0], idxs, [len(y) - 1])).ravel() + return idxs[ + self.lttb_downsampler.downsample(x[idxs], y[idxs], n_out=n_out, **kwargs) + ] + + +class MinMaxLTTB_py(_MinMaxLTTB_py): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler = MinMax_py() + self.lttb_downsampler = LTTB_py() + + +class NaNMinMaxLTTB_py(_MinMaxLTTB_py): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler = NaNMinMax_py() + self.lttb_downsampler = LTTB_py() + + +class _FPCS_py(AbstractDownsampler, ABC): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler: AbstractDownsampler = None + + def _downsample( + self, x: Union[np.ndarray, None], y: np.ndarray, n_out: int, **kwargs + ) -> np.ndarray: + # fmt: off + # ------------------------- Helper datastructures ------------------------- + class Flag(Enum): + NONE = -1 # -1: no data points have been retained + MAX = 0 # 0: a max has been retained + MIN = 1 # 1: a min has been retained + + Point = NamedTuple("point", [("x", int), ("y", y.dtype)]) + # ------------------------------------------------------------------------ + + # NOTE: is fine for this implementation as this is only used for testing + if x is None: + # Is fine for this implementation as this is only used for testing + x = np.arange(y.shape[0]) + + # 0. Downsample the data using the MinMax algorithm + MINMAX_FACTOR = 2 + n_1 = len(x) - 1 + # NOTE: as we include the first and last point, we reduce the number of points + downsampled_idxs = self.minmax_downsampler.downsample( + x[1:n_1], y[1:n_1], n_out=(n_out - 2) * MINMAX_FACTOR + ) + downsampled_idxs += 1 + + previous_min_flag: Flag = Flag.NONE + potential_point = Point(0, 0) + max_point = Point(0, y[0]) + min_point = Point(0, y[0]) + + sampled_indices = [] + sampled_indices.append(0) # prepend the first point + for i in range(0, len(downsampled_idxs), 2): + # get the min and max indices and convert them to the correct order + min_idx, max_idxs = downsampled_idxs[i], downsampled_idxs[i + 1] + if y[min_idx] > y[max_idxs]: + min_idx, max_idxs = max_idxs, min_idx + bin_min = Point(min_idx, y[min_idx]) + bin_max = Point(max_idxs, y[max_idxs]) + + # Use the (nan-aware) comparison function to update the min and max points + # As comparisons with NaN always return False, we inverted the comparison + # the (inverted) > and <= stem from the pseudo code details in the paper + if not (max_point.y > bin_max.y): + max_point = bin_max + if not (min_point.y <= bin_min.y): + min_point = bin_min + + # if the min is to the left of the max + if min_point.x < max_point.x: + # if the min was not selected in the previous bin + if previous_min_flag == Flag.MIN and min_point.x != potential_point.x: + # Both adjacent samplings retain MinPoint, and PotentialPoint and + # MinPoint are not the same point + sampled_indices.append(potential_point.x) + + sampled_indices.append(min_point.x) # receiving min_point b4 max_point -> retain min_point + potential_point = max_point # update potential point to unselected max_point + min_point = max_point # update min_point to unselected max_point + previous_min_flag = Flag.MIN # min_point has been selected + + else: + if previous_min_flag == Flag.MAX and max_point.x != potential_point.x: + # # Both adjacent samplings retain MaxPoint, and PotentialPoint and + # MaxPoint are not the same point + sampled_indices.append(potential_point.x) + + sampled_indices.append(max_point.x) # receiving max_point b4 min_point -> retain max_point + potential_point = min_point # update potential point to unselected min_point + max_point = min_point # update max_point to unselected min_point + previous_min_flag = Flag.MAX # max_point has been selected + + sampled_indices.append(len(y) - 1) # append the last point + # fmt: on + return np.array(sampled_indices, dtype=np.int64) + + +class FPCS_py(_FPCS_py): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler = MinMax_py() + + +class NaNFPCS_py(_FPCS_py): + def __init__( + self, check_contiguous=True, x_dtype_regex_list=None, y_dtype_regex_list=None + ): + super().__init__(check_contiguous, x_dtype_regex_list, y_dtype_regex_list) + self.minmax_downsampler = NaNMinMax_py() diff --git a/tsdownsample/downsamplers.py b/tsdownsample/downsamplers.py index 93519b7..4c31db0 100644 --- a/tsdownsample/downsamplers.py +++ b/tsdownsample/downsamplers.py @@ -136,6 +136,36 @@ def downsample( ) +class FPCSDownsampler(AbstractRustDownsampler): + """Downsampler that uses the Feature Preserving Compensated Sampling (FPCS) + algorithm. + + FPCS paper: https://doi.org/10.1109/TVCG.2024.3456375 + """ + + @property + def rust_mod(self): + return _tsdownsample_rs.fpcs + + def downsample(self, *args, n_out: int, parallel: bool = False, **_): + return super().downsample(*args, n_out=n_out, parallel=parallel) + + +class NaNFPCSDownsampler(AbstractRustNaNDownsampler): + """Downsampler that uses the Feature Preserving Compensated Sampling (FPCS) + algorithm. If the y data contains NaNs, the indices of these NaNs are returned. + + FPCS paper: https://doi.org/10.1109/TVCG.2024.3456375 + """ + + @property + def rust_mod(self): + return _tsdownsample_rs.fpcs + + def downsample(self, *args, n_out: int, parallel: bool = False, **_): + return super().downsample(*args, n_out=n_out, parallel=parallel) + + # ------------------ EveryNth Downsampler ------------------