diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index 3ec446c51583..4a5df19dff30 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -540,7 +540,11 @@ impl TableFunctionImpl for MetadataCacheFunc { let mut hits_arr = vec![]; let mut extra_arr = vec![]; - let cached_entries = self.cache_manager.get_file_metadata_cache().list_entries(); + let cached_entries = self + .cache_manager + .get_file_metadata_cache() + .expect("Default metadata cache should be present") + .list_entries(); for (path, entry) in cached_entries { path_arr.push(path.to_string()); diff --git a/datafusion-examples/examples/benchmark.rs b/datafusion-examples/examples/benchmark.rs new file mode 100644 index 000000000000..d265323b856e --- /dev/null +++ b/datafusion-examples/examples/benchmark.rs @@ -0,0 +1,68 @@ +use datafusion::execution::cache::cache_manager::CacheManagerConfig; +use datafusion::execution::cache::cache_unit::{ + DefaultFileStatisticsCache, DefaultFilesMetadataCache, +}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; +use futures::future::join_all; +use std::env; +use std::sync::Arc; +use std::time::Instant; + +#[tokio::main] +async fn main() { + let args: Vec = env::args().collect(); + println!("Program path: {}", &args[0]); + println!("Arguments: {:?}", &args[1..]); + + let metadata_cache = Arc::new(DefaultFilesMetadataCache::new(10737418240)); + let cache = Arc::new(DefaultFileStatisticsCache::default()); + + let tasks: Vec<_> = (0..10).map(|i| { + let cache_option = args[1].clone(); + let metadata_cache = metadata_cache.clone(); + let cache = cache.clone(); + + tokio::spawn(async move { + let mut cache_manager_config = CacheManagerConfig::default(); + if cache_option.contains("metadata_cache") { + println!("Adding metadata cache"); + cache_manager_config = cache_manager_config.with_file_metadata_cache(Some(metadata_cache)); + } + if cache_option.contains("statistics_cache") { + println!("Adding statistics cache"); + cache_manager_config = cache_manager_config.with_files_statistics_cache(Some(cache)); + } + + let rt_builder = RuntimeEnvBuilder::new() + .with_cache_manager(cache_manager_config).build().expect("Expected the runtime to be created"); + + let mut sessionConfig = SessionConfig::default(); + let config_options = sessionConfig.options_mut(); + //config_options.execution.target_partitions = 1; + + let ctx = SessionContext::new_with_config_rt(sessionConfig, Arc::new(rt_builder)); + let query = "EXPLAIN SELECT SUM(\"AdvEngineID\"), COUNT(*), AVG(\"ResolutionWidth\") FROM hits"; + + ctx.register_parquet("hits", "/Users/abandeji/Public/workplace/datafusion/datafusion-examples/resources/hits.parquet", ParquetReadOptions::default()).await.unwrap(); + let cache_enabled = ctx.runtime_env().cache_manager.get_file_metadata_cache(); + println!("Cache enabled: {:?}", cache_enabled); + + println!("Thread {} starting", i); + for j in 0..100 { + let timer = Instant::now(); + let df = ctx + .sql(query) + .await.unwrap(); + + df.to_string().await.unwrap(); + + let time_taken = timer.elapsed().as_millis(); + println!("Time Taken: {}", time_taken); + } + println!("Thread {} completed", i); + }) + }).collect(); + + join_all(tasks).await; +} diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 088c4408fff5..36e52d76d270 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -197,7 +197,7 @@ mod tests { let file_metadata_cache = ctx.runtime_env().cache_manager.get_file_metadata_cache(); let stats = DFParquetMetadata::new(&store, &meta[0]) - .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)) .fetch_statistics(&schema) .await?; @@ -208,7 +208,7 @@ mod tests { assert_eq!(c2_stats.null_count, Precision::Exact(3)); let stats = DFParquetMetadata::new(&store, &meta[1]) - .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)) .fetch_statistics(&schema) .await?; @@ -391,8 +391,8 @@ mod tests { df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 2); - let df_meta = - df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + let df_meta = df_meta + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)); // Increases by 3 because cache has no entries yet df_meta.fetch_metadata().await?; @@ -422,8 +422,8 @@ mod tests { assert_eq!(store.request_count(), 10); // No increase, cache being used - let df_meta = - df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + let df_meta = df_meta + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)); let stats = df_meta.fetch_statistics(&schema).await?; assert_eq!(store.request_count(), 10); @@ -450,8 +450,8 @@ mod tests { let ctx = session.state(); let file_metadata_cache = ctx.runtime_env().cache_manager.get_file_metadata_cache(); - let df_meta = - df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + let df_meta = df_meta + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)); // Increases by 1 because cache has no entries yet and new session context df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 2); @@ -475,8 +475,8 @@ mod tests { let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; assert_eq!(store.request_count(), 4); // No increase, cache being used - let df_meta = - df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + let df_meta = df_meta + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)); let stats = df_meta.fetch_statistics(&schema).await?; assert_eq!(store.request_count(), 4); @@ -499,8 +499,8 @@ mod tests { assert_eq!(store.request_count(), 1); // No increase because cache has an entry - let df_meta = - df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + let df_meta = df_meta + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)); df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 1); @@ -557,7 +557,7 @@ mod tests { let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); let stats = DFParquetMetadata::new(store.as_ref(), &files[0]) - .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)) .fetch_statistics(&schema) .await?; assert_eq!(stats.num_rows, Precision::Exact(4)); @@ -635,7 +635,7 @@ mod tests { let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); let stats = DFParquetMetadata::new(store.as_ref(), &files[0]) - .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)) .fetch_statistics(&schema) .await?; assert_eq!(store.request_count(), 6); @@ -664,7 +664,7 @@ mod tests { // No increase in request count because cache is not empty let stats = DFParquetMetadata::new(store.as_ref(), &files[1]) - .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone)) .fetch_statistics(&schema) .await?; assert_eq!(store.request_count(), 6); diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 9627d7bccdb0..6afe6d39a31a 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -220,6 +220,7 @@ async fn test_test_metadata_cache_limit() { .runtime_env() .cache_manager .get_file_metadata_cache() + .expect("Default metadata cache should be present") .cache_limit() }; diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 963c1d77950c..b18b1b4cfd19 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -386,7 +386,9 @@ impl FileFormat for ParquetFormat { let result = DFParquetMetadata::new(store.as_ref(), object) .with_metadata_size_hint(self.metadata_size_hint()) .with_decryption_properties(file_decryption_properties.as_ref()) - .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .with_file_metadata_cache( + file_metadata_cache.as_ref().map(Arc::clone), + ) .with_coerce_int96(coerce_int96) .fetch_schema_with_location() .await?; @@ -447,7 +449,7 @@ impl FileFormat for ParquetFormat { DFParquetMetadata::new(store, object) .with_metadata_size_hint(self.metadata_size_hint()) .with_decryption_properties(file_decryption_properties.as_ref()) - .with_file_metadata_cache(Some(file_metadata_cache)) + .with_file_metadata_cache(file_metadata_cache) .fetch_statistics(&table_schema) .await } diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index d0c008ad35cf..d8618cc6119a 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -165,13 +165,13 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { #[derive(Debug)] pub struct CachedParquetFileReaderFactory { store: Arc, - metadata_cache: Arc, + metadata_cache: Option>, } impl CachedParquetFileReaderFactory { pub fn new( store: Arc, - metadata_cache: Arc, + metadata_cache: Option>, ) -> Self { Self { store, @@ -208,7 +208,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { inner, file_metrics, file_meta, - metadata_cache: Arc::clone(&self.metadata_cache), + metadata_cache: self.metadata_cache.as_ref().map(Arc::clone), metadata_size_hint, })) } @@ -222,7 +222,7 @@ pub struct CachedParquetFileReader { store: Arc, pub inner: ParquetObjectReader, file_meta: FileMeta, - metadata_cache: Arc, + metadata_cache: Option>, metadata_size_hint: Option, } @@ -253,7 +253,7 @@ impl AsyncFileReader for CachedParquetFileReader { #[allow(unused_variables)] options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, parquet::errors::Result>> { let file_meta = self.file_meta.clone(); - let metadata_cache = Arc::clone(&self.metadata_cache); + let metadata_cache = self.metadata_cache.as_ref().map(Arc::clone); async move { #[cfg(feature = "parquet_encryption")] @@ -265,7 +265,7 @@ impl AsyncFileReader for CachedParquetFileReader { DFParquetMetadata::new(&self.store, &file_meta.object_meta) .with_decryption_properties(file_decryption_properties) - .with_file_metadata_cache(Some(Arc::clone(&metadata_cache))) + .with_file_metadata_cache(metadata_cache) .with_metadata_size_hint(self.metadata_size_hint) .fetch_metadata() .await diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 3e0f4065d13f..80fac27d1531 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::cache::cache_unit::DefaultFilesMetadataCache; use crate::cache::CacheAccessor; use datafusion_common::{Result, Statistics}; use object_store::path::Path; @@ -132,7 +131,7 @@ impl Debug for dyn FileMetadataCache { pub struct CacheManager { file_statistic_cache: Option, list_files_cache: Option, - file_metadata_cache: Arc, + file_metadata_cache: Option>, } impl CacheManager { @@ -142,16 +141,12 @@ impl CacheManager { let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone); - let file_metadata_cache = config - .file_metadata_cache - .as_ref() - .map(Arc::clone) - .unwrap_or_else(|| { - Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit)) - }); + let file_metadata_cache = config.file_metadata_cache.as_ref().map(Arc::clone); - // the cache memory limit might have changed, ensure the limit is updated - file_metadata_cache.update_cache_limit(config.metadata_cache_limit); + // Update cache limit if cache exists + if let Some(ref cache) = file_metadata_cache { + cache.update_cache_limit(config.metadata_cache_limit); + } Ok(Arc::new(CacheManager { file_statistic_cache, @@ -171,13 +166,15 @@ impl CacheManager { } /// Get the file embedded metadata cache. - pub fn get_file_metadata_cache(&self) -> Arc { - Arc::clone(&self.file_metadata_cache) + pub fn get_file_metadata_cache(&self) -> Option> { + self.file_metadata_cache.as_ref().map(Arc::clone) } /// Get the limit of the file embedded metadata cache. pub fn get_metadata_cache_limit(&self) -> usize { - self.file_metadata_cache.cache_limit() + self.file_metadata_cache + .as_ref() + .map_or(0, |cache| cache.cache_limit()) } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index db045a8b7e8a..609ace1f3b90 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -331,9 +331,7 @@ impl RuntimeEnvBuilder { .cache_manager .get_file_statistic_cache(), list_files_cache: runtime_env.cache_manager.get_list_files_cache(), - file_metadata_cache: Some( - runtime_env.cache_manager.get_file_metadata_cache(), - ), + file_metadata_cache: runtime_env.cache_manager.get_file_metadata_cache(), metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(), };