Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,11 @@ impl TableFunctionImpl for MetadataCacheFunc {
let mut hits_arr = vec![];
let mut extra_arr = vec![];

let cached_entries = self.cache_manager.get_file_metadata_cache().list_entries();
let cached_entries = self
.cache_manager
.get_file_metadata_cache()
.expect("Default metadata cache should be present")
.list_entries();

for (path, entry) in cached_entries {
path_arr.push(path.to_string());
Expand Down
68 changes: 68 additions & 0 deletions datafusion-examples/examples/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
use datafusion::execution::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultFilesMetadataCache,
};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use futures::future::join_all;
use std::env;
use std::sync::Arc;
use std::time::Instant;

#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();
println!("Program path: {}", &args[0]);
println!("Arguments: {:?}", &args[1..]);

let metadata_cache = Arc::new(DefaultFilesMetadataCache::new(10737418240));
let cache = Arc::new(DefaultFileStatisticsCache::default());

let tasks: Vec<_> = (0..10).map(|i| {
let cache_option = args[1].clone();
let metadata_cache = metadata_cache.clone();
let cache = cache.clone();

tokio::spawn(async move {
let mut cache_manager_config = CacheManagerConfig::default();
if cache_option.contains("metadata_cache") {
println!("Adding metadata cache");
cache_manager_config = cache_manager_config.with_file_metadata_cache(Some(metadata_cache));
}
if cache_option.contains("statistics_cache") {
println!("Adding statistics cache");
cache_manager_config = cache_manager_config.with_files_statistics_cache(Some(cache));
}

let rt_builder = RuntimeEnvBuilder::new()
.with_cache_manager(cache_manager_config).build().expect("Expected the runtime to be created");

let mut sessionConfig = SessionConfig::default();
let config_options = sessionConfig.options_mut();
//config_options.execution.target_partitions = 1;

let ctx = SessionContext::new_with_config_rt(sessionConfig, Arc::new(rt_builder));
let query = "EXPLAIN SELECT SUM(\"AdvEngineID\"), COUNT(*), AVG(\"ResolutionWidth\") FROM hits";

ctx.register_parquet("hits", "/Users/abandeji/Public/workplace/datafusion/datafusion-examples/resources/hits.parquet", ParquetReadOptions::default()).await.unwrap();
let cache_enabled = ctx.runtime_env().cache_manager.get_file_metadata_cache();
println!("Cache enabled: {:?}", cache_enabled);

println!("Thread {} starting", i);
for j in 0..100 {
let timer = Instant::now();
let df = ctx
.sql(query)
.await.unwrap();

df.to_string().await.unwrap();

let time_taken = timer.elapsed().as_millis();
println!("Time Taken: {}", time_taken);
}
println!("Thread {} completed", i);
})
}).collect();

join_all(tasks).await;
}
30 changes: 15 additions & 15 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ mod tests {
let file_metadata_cache =
ctx.runtime_env().cache_manager.get_file_metadata_cache();
let stats = DFParquetMetadata::new(&store, &meta[0])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone))
.fetch_statistics(&schema)
.await?;

Expand All @@ -208,7 +208,7 @@ mod tests {
assert_eq!(c2_stats.null_count, Precision::Exact(3));

let stats = DFParquetMetadata::new(&store, &meta[1])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone))
.fetch_statistics(&schema)
.await?;

Expand Down Expand Up @@ -391,8 +391,8 @@ mod tests {
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 2);

let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
let df_meta = df_meta
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone));

// Increases by 3 because cache has no entries yet
df_meta.fetch_metadata().await?;
Expand Down Expand Up @@ -422,8 +422,8 @@ mod tests {
assert_eq!(store.request_count(), 10);

// No increase, cache being used
let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
let df_meta = df_meta
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone));
let stats = df_meta.fetch_statistics(&schema).await?;
assert_eq!(store.request_count(), 10);

Expand All @@ -450,8 +450,8 @@ mod tests {
let ctx = session.state();
let file_metadata_cache =
ctx.runtime_env().cache_manager.get_file_metadata_cache();
let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
let df_meta = df_meta
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone));
// Increases by 1 because cache has no entries yet and new session context
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 2);
Expand All @@ -475,8 +475,8 @@ mod tests {
let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
assert_eq!(store.request_count(), 4);
// No increase, cache being used
let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
let df_meta = df_meta
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone));
let stats = df_meta.fetch_statistics(&schema).await?;
assert_eq!(store.request_count(), 4);

Expand All @@ -499,8 +499,8 @@ mod tests {
assert_eq!(store.request_count(), 1);

// No increase because cache has an entry
let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
let df_meta = df_meta
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone));
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 1);

Expand Down Expand Up @@ -557,7 +557,7 @@ mod tests {
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone))
.fetch_statistics(&schema)
.await?;
assert_eq!(stats.num_rows, Precision::Exact(4));
Expand Down Expand Up @@ -635,7 +635,7 @@ mod tests {
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone))
.fetch_statistics(&schema)
.await?;
assert_eq!(store.request_count(), 6);
Expand Down Expand Up @@ -664,7 +664,7 @@ mod tests {

// No increase in request count because cache is not empty
let stats = DFParquetMetadata::new(store.as_ref(), &files[1])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.with_file_metadata_cache(file_metadata_cache.as_ref().map(Arc::clone))
.fetch_statistics(&schema)
.await?;
assert_eq!(store.request_count(), 6);
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ async fn test_test_metadata_cache_limit() {
.runtime_env()
.cache_manager
.get_file_metadata_cache()
.expect("Default metadata cache should be present")
.cache_limit()
};

Expand Down
6 changes: 4 additions & 2 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ impl FileFormat for ParquetFormat {
let result = DFParquetMetadata::new(store.as_ref(), object)
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties.as_ref())
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.with_file_metadata_cache(
file_metadata_cache.as_ref().map(Arc::clone),
)
.with_coerce_int96(coerce_int96)
.fetch_schema_with_location()
.await?;
Expand Down Expand Up @@ -447,7 +449,7 @@ impl FileFormat for ParquetFormat {
DFParquetMetadata::new(store, object)
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties.as_ref())
.with_file_metadata_cache(Some(file_metadata_cache))
.with_file_metadata_cache(file_metadata_cache)
.fetch_statistics(&table_schema)
.await
}
Expand Down
12 changes: 6 additions & 6 deletions datafusion/datasource-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
#[derive(Debug)]
pub struct CachedParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
metadata_cache: Arc<dyn FileMetadataCache>,
metadata_cache: Option<Arc<dyn FileMetadataCache>>,
}

impl CachedParquetFileReaderFactory {
pub fn new(
store: Arc<dyn ObjectStore>,
metadata_cache: Arc<dyn FileMetadataCache>,
metadata_cache: Option<Arc<dyn FileMetadataCache>>,
) -> Self {
Self {
store,
Expand Down Expand Up @@ -208,7 +208,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
inner,
file_metrics,
file_meta,
metadata_cache: Arc::clone(&self.metadata_cache),
metadata_cache: self.metadata_cache.as_ref().map(Arc::clone),
metadata_size_hint,
}))
}
Expand All @@ -222,7 +222,7 @@ pub struct CachedParquetFileReader {
store: Arc<dyn ObjectStore>,
pub inner: ParquetObjectReader,
file_meta: FileMeta,
metadata_cache: Arc<dyn FileMetadataCache>,
metadata_cache: Option<Arc<dyn FileMetadataCache>>,
metadata_size_hint: Option<usize>,
}

Expand Down Expand Up @@ -253,7 +253,7 @@ impl AsyncFileReader for CachedParquetFileReader {
#[allow(unused_variables)] options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
let file_meta = self.file_meta.clone();
let metadata_cache = Arc::clone(&self.metadata_cache);
let metadata_cache = self.metadata_cache.as_ref().map(Arc::clone);

async move {
#[cfg(feature = "parquet_encryption")]
Expand All @@ -265,7 +265,7 @@ impl AsyncFileReader for CachedParquetFileReader {

DFParquetMetadata::new(&self.store, &file_meta.object_meta)
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(Arc::clone(&metadata_cache)))
.with_file_metadata_cache(metadata_cache)
.with_metadata_size_hint(self.metadata_size_hint)
.fetch_metadata()
.await
Expand Down
25 changes: 11 additions & 14 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::CacheAccessor;
use datafusion_common::{Result, Statistics};
use object_store::path::Path;
Expand Down Expand Up @@ -132,7 +131,7 @@ impl Debug for dyn FileMetadataCache {
pub struct CacheManager {
file_statistic_cache: Option<FileStatisticsCache>,
list_files_cache: Option<ListFilesCache>,
file_metadata_cache: Arc<dyn FileMetadataCache>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
}

impl CacheManager {
Expand All @@ -142,16 +141,12 @@ impl CacheManager {

let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone);

let file_metadata_cache = config
.file_metadata_cache
.as_ref()
.map(Arc::clone)
.unwrap_or_else(|| {
Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit))
});
let file_metadata_cache = config.file_metadata_cache.as_ref().map(Arc::clone);

// the cache memory limit might have changed, ensure the limit is updated
file_metadata_cache.update_cache_limit(config.metadata_cache_limit);
// Update cache limit if cache exists
if let Some(ref cache) = file_metadata_cache {
cache.update_cache_limit(config.metadata_cache_limit);
}

Ok(Arc::new(CacheManager {
file_statistic_cache,
Expand All @@ -171,13 +166,15 @@ impl CacheManager {
}

/// Get the file embedded metadata cache.
pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
Arc::clone(&self.file_metadata_cache)
pub fn get_file_metadata_cache(&self) -> Option<Arc<dyn FileMetadataCache>> {
self.file_metadata_cache.as_ref().map(Arc::clone)
}

/// Get the limit of the file embedded metadata cache.
pub fn get_metadata_cache_limit(&self) -> usize {
self.file_metadata_cache.cache_limit()
self.file_metadata_cache
.as_ref()
.map_or(0, |cache| cache.cache_limit())
}
}

Expand Down
4 changes: 1 addition & 3 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,7 @@ impl RuntimeEnvBuilder {
.cache_manager
.get_file_statistic_cache(),
list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
file_metadata_cache: Some(
runtime_env.cache_manager.get_file_metadata_cache(),
),
file_metadata_cache: runtime_env.cache_manager.get_file_metadata_cache(),
metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(),
};

Expand Down