From 3c75eab7fc6ac6e0b3a88c31859132094ab55d4f Mon Sep 17 00:00:00 2001 From: ibENPC Date: Mon, 29 Nov 2021 14:57:18 +0100 Subject: [PATCH] Use Polars ThreadPool --- polars/polars-core/src/frame/mod.rs | 118 +++++++++++++++------------- 1 file changed, 64 insertions(+), 54 deletions(-) diff --git a/polars/polars-core/src/frame/mod.rs b/polars/polars-core/src/frame/mod.rs index 265dcb21cbdd..a894668aace5 100644 --- a/polars/polars-core/src/frame/mod.rs +++ b/polars/polars-core/src/frame/mod.rs @@ -357,7 +357,7 @@ impl DataFrame { /// Aggregate all the chunks in the DataFrame to a single chunk in parallel. /// This may lead to more peak memory consumption. pub fn as_single_chunk_par(&mut self) -> &mut Self { - self.columns = self.columns.par_iter().map(|s| s.rechunk()).collect(); + self.columns = POOL.install(|| self.columns.par_iter().map(|s| s.rechunk()).collect()); self } @@ -1266,14 +1266,15 @@ impl DataFrame { where I: Iterator + Clone + Sync, { - let new_col = self - .columns - .par_iter() - .map(|s| { - let mut i = iter.clone(); - s.take_iter(&mut i) - }) - .collect::>()?; + let new_col = POOL.install(|| { + self.columns + .par_iter() + .map(|s| { + let mut i = iter.clone(); + s.take_iter(&mut i) + }) + .collect::>() + })?; Ok(DataFrame::new_no_checks(new_col)) } @@ -1312,13 +1313,15 @@ impl DataFrame { .map(|s| s.take_iter_unchecked(&mut iter)) .collect::>() } else { - self.columns - .par_iter() - .map(|s| { - let mut i = iter.clone(); - s.take_iter_unchecked(&mut i) - }) - .collect::>() + POOL.install(|| { + self.columns + .par_iter() + .map(|s| { + let mut i = iter.clone(); + s.take_iter_unchecked(&mut i) + }) + .collect::>() + }) }; DataFrame::new_no_checks(new_col) } @@ -1359,13 +1362,15 @@ impl DataFrame { .map(|s| s.take_opt_iter_unchecked(&mut iter)) .collect::>() } else { - self.columns - .par_iter() - .map(|s| { - let mut i = iter.clone(); - s.take_opt_iter_unchecked(&mut i) - }) - .collect::>() + POOL.install(|| { + self.columns + .par_iter() + .map(|s| { + let mut i = iter.clone(); + s.take_opt_iter_unchecked(&mut i) + }) + .collect::>() + }) }; DataFrame::new_no_checks(new_col) @@ -1988,7 +1993,7 @@ impl DataFrame { /// /// See the method on [Series](../series/enum.Series.html#method.shift) for more info on the `shift` operation. pub fn shift(&self, periods: i64) -> Self { - let col = self.columns.par_iter().map(|s| s.shift(periods)).collect(); + let col = POOL.install(|| self.columns.par_iter().map(|s| s.shift(periods)).collect()); DataFrame::new_no_checks(col) } @@ -2001,11 +2006,12 @@ impl DataFrame { /// /// See the method on [Series](../series/enum.Series.html#method.fill_null) for more info on the `fill_null` operation. pub fn fill_null(&self, strategy: FillNullStrategy) -> Result { - let col = self - .columns - .par_iter() - .map(|s| s.fill_null(strategy)) - .collect::>>()?; + let col = POOL.install(|| { + self.columns + .par_iter() + .map(|s| s.fill_null(strategy)) + .collect::>>() + })?; Ok(DataFrame::new_no_checks(col)) } @@ -2038,7 +2044,7 @@ impl DataFrame { /// +---------+---------+ /// ``` pub fn max(&self) -> Self { - let columns = self.columns.par_iter().map(|s| s.max_as_series()).collect(); + let columns = POOL.install(|| self.columns.par_iter().map(|s| s.max_as_series()).collect()); DataFrame::new_no_checks(columns) } @@ -2071,7 +2077,7 @@ impl DataFrame { /// +-------------------+--------------------+ /// ``` pub fn std(&self) -> Self { - let columns = self.columns.par_iter().map(|s| s.std_as_series()).collect(); + let columns = POOL.install(|| self.columns.par_iter().map(|s| s.std_as_series()).collect()); DataFrame::new_no_checks(columns) } /// Aggregate the columns to their variation values. @@ -2103,7 +2109,7 @@ impl DataFrame { /// +---------+---------+ /// ``` pub fn var(&self) -> Self { - let columns = self.columns.par_iter().map(|s| s.var_as_series()).collect(); + let columns = POOL.install(|| self.columns.par_iter().map(|s| s.var_as_series()).collect()); DataFrame::new_no_checks(columns) } @@ -2136,7 +2142,7 @@ impl DataFrame { /// +---------+---------+ /// ``` pub fn min(&self) -> Self { - let columns = self.columns.par_iter().map(|s| s.min_as_series()).collect(); + let columns = POOL.install(|| self.columns.par_iter().map(|s| s.min_as_series()).collect()); DataFrame::new_no_checks(columns) } @@ -2169,7 +2175,7 @@ impl DataFrame { /// +---------+---------+ /// ``` pub fn sum(&self) -> Self { - let columns = self.columns.par_iter().map(|s| s.sum_as_series()).collect(); + let columns = POOL.install(|| self.columns.par_iter().map(|s| s.sum_as_series()).collect()); DataFrame::new_no_checks(columns) } @@ -2202,11 +2208,12 @@ impl DataFrame { /// +---------+---------+ /// ``` pub fn mean(&self) -> Self { - let columns = self - .columns - .par_iter() - .map(|s| s.mean_as_series()) - .collect(); + let columns = POOL.install(|| { + self.columns + .par_iter() + .map(|s| s.mean_as_series()) + .collect() + }); DataFrame::new_no_checks(columns) } @@ -2239,21 +2246,23 @@ impl DataFrame { /// +---------+---------+ /// ``` pub fn median(&self) -> Self { - let columns = self - .columns - .par_iter() - .map(|s| s.median_as_series()) - .collect(); + let columns = POOL.install(|| { + self.columns + .par_iter() + .map(|s| s.median_as_series()) + .collect() + }); DataFrame::new_no_checks(columns) } /// Aggregate the columns to their quantile values. pub fn quantile(&self, quantile: f64) -> Result { - let columns = self - .columns - .par_iter() - .map(|s| s.quantile_as_series(quantile)) - .collect::>>()?; + let columns = POOL.install(|| { + self.columns + .par_iter() + .map(|s| s.quantile_as_series(quantile)) + .collect::>>() + })?; Ok(DataFrame::new_no_checks(columns)) } @@ -2436,11 +2445,12 @@ impl DataFrame { /// +------+------+------+--------+--------+--------+---------+---------+---------+ /// ``` pub fn to_dummies(&self) -> Result { - let cols = self - .columns - .par_iter() - .map(|s| s.to_dummies()) - .collect::>>()?; + let cols = POOL.install(|| { + self.columns + .par_iter() + .map(|s| s.to_dummies()) + .collect::>>() + })?; accumulate_dataframes_horizontal(cols) }