diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..fd37c6bf3040f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -18,14 +18,18 @@ //! [`ParquetMorselizer`] state machines for opening Parquet files use crate::page_filter::PagePruningAccessPlanFilter; -use crate::row_filter::build_projection_read_plan; +use crate::row_filter::{ + FilterCandidate, ParquetReadPlan, build_projection_read_plan, + build_row_filter_candidates, row_filter_from_candidates, +}; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, - apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, + apply_file_schema_type_coercions, coerce_int96_to_resolution, }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; +use datafusion_datasource::file_stream::SharedWorkSource; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -136,6 +140,14 @@ pub(super) struct ParquetMorselizer { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Shared work queue of sibling FileStreams, if any. When present, this + /// morselizer may split large parquet files into row-group-sized chunks + /// and donate all-but-one back onto this queue so idle siblings steal + /// them instead of sitting idle behind a single hot file. + pub shared_work_source: Option, + /// Output schema (table schema with projection applied). Computed + /// once at morselizer construction so every file open reuses it. + pub output_schema: SchemaRef, } impl fmt::Debug for ParquetMorselizer { @@ -190,6 +202,9 @@ impl Morselizer for ParquetMorselizer { /// PruneWithBloomFilters /// | /// v +/// SplitAndDonate +/// | +/// v /// BuildStream /// | /// v @@ -224,6 +239,11 @@ enum ParquetOpenState { LoadBloomFilters(BoxFuture<'static, Result>), /// Pruning with preloaded Bloom Filters PruneWithBloomFilters(Box), + /// Apply file-level LIMIT pruning (runs after range/stats/bloom so it + /// sees which row groups are still in scope) and donate the survivors + /// in 1-RG chunks back to the shared work queue. Stealers pop a chunk + /// with a finalized access plan and skip every earlier pruning stage. + SplitAndDonate(Box), /// Builds the final reader stream /// /// TODO: split state as this currently does both I/O and CPU work. @@ -247,6 +267,7 @@ impl fmt::Debug for ParquetOpenState { ParquetOpenState::PruneWithStatistics(_) => "PruneWithStatistics", ParquetOpenState::LoadBloomFilters(_) => "LoadBloomFilters", ParquetOpenState::PruneWithBloomFilters(_) => "PruneWithBloomFilters", + ParquetOpenState::SplitAndDonate(_) => "SplitAndDonate", ParquetOpenState::BuildStream(_) => "BuildStream", ParquetOpenState::Ready(_) => "Ready", ParquetOpenState::Done => "Done", @@ -289,6 +310,38 @@ struct PreparedParquetOpen { preserve_order: bool, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, + /// Shared work queue used to donate row-group chunks of this file to + /// sibling streams. `None` when sibling stealing is disabled. + shared_work_source: Option, +} + +/// Extension carried on a donated `PartitionedFile`. +/// +/// Donation happens after the donor has already run every pre-scan stage +/// (file-level pruning, metadata load, filter preparation, page index +/// load, stats / bloom / limit pruning). The chunk packages the full +/// result so the stealer's state machine can start directly at +/// `BuildStream` — no footer round-trip, no pruning, no predicate build. +#[derive(Clone)] +pub(crate) struct ParquetOpenChunk { + pub access_plan: ParquetAccessPlan, + pub reader_metadata: ArrowReaderMetadata, + pub options: ArrowReaderOptions, + pub physical_file_schema: SchemaRef, + pub predicate: Option>, + pub projection: ProjectionExprs, + pub pruning_predicate: Option>, + pub page_pruning_predicate: Option>, + pub row_filter_candidates: Option>>, + pub read_plan: Arc, +} + +impl fmt::Debug for ParquetOpenChunk { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetOpenChunk") + .field("access_plan", &self.access_plan) + .finish_non_exhaustive() + } } /// State of [`ParquetOpenState`] @@ -308,6 +361,16 @@ struct FiltersPreparedParquetOpen { loaded: MetadataLoadedParquetOpen, pruning_predicate: Option>, page_pruning_predicate: Option>, + /// Row-filter conjunct candidates built once here and shared with + /// sibling stealers via `ParquetOpenChunk` so each open doesn't + /// redo the schema-walk + ProjectionMask build. Only the cheap + /// per-open metric binding happens at `build_stream` time. + row_filter_candidates: Option>>, + /// Projection read plan (ProjectionMask + projected schema), + /// deterministic for a given (projection, physical_file_schema, + /// parquet_schema) triple. Built once here and reused in + /// `build_stream` so stealers don't redo the schema walk. + read_plan: Arc, } /// State of [`ParquetOpenState`] @@ -394,8 +457,12 @@ impl ParquetOpenState { Ok(ParquetOpenState::LoadBloomFilters(future)) } ParquetOpenState::PruneWithBloomFilters(loaded) => Ok( - ParquetOpenState::BuildStream(Box::new(loaded.prune_bloom_filters())), + ParquetOpenState::SplitAndDonate(Box::new(loaded.prune_bloom_filters())), ), + ParquetOpenState::SplitAndDonate(prepared) => { + let next = prepared.split_and_donate()?; + Ok(ParquetOpenState::BuildStream(Box::new(next))) + } ParquetOpenState::BuildStream(prepared) => { Ok(ParquetOpenState::Ready(prepared.build_stream()?)) } @@ -447,6 +514,20 @@ impl fmt::Debug for ParquetMorselPlanner { impl ParquetMorselPlanner { fn try_new(morselizer: &ParquetMorselizer, file: PartitionedFile) -> Result { + // A donated chunk carries the full handoff state: jump straight + // to `BuildStream` and skip every pruning stage. + if let Some(chunk) = file + .extensions + .as_ref() + .and_then(|ext| ext.downcast_ref::()) + .cloned() + { + let built = morselizer.build_stealer_state(file, chunk)?; + return Ok(Self { + state: ParquetOpenState::BuildStream(Box::new(built)), + }); + } + let prepared = morselizer.prepare_open_file(file)?; #[cfg(feature = "parquet_encryption")] let state = ParquetOpenState::Start { @@ -537,6 +618,7 @@ impl ParquetMorselizer { ) -> Result { let file_range = partitioned_file.range.clone(); let extensions = partitioned_file.extensions.clone(); + let shared_work_source = self.shared_work_source.clone(); let file_name = partitioned_file.object_meta.location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); @@ -554,13 +636,8 @@ impl ParquetMorselizer { &self.metrics, )?; - // Calculate the output schema from the original projection (before literal replacement) - // so we get correct field names from column references let logical_file_schema = Arc::clone(self.table_schema.file_schema()); - let output_schema = Arc::new( - self.projection - .project_schema(self.table_schema.table_schema())?, - ); + let output_schema = Arc::clone(&self.output_schema); // Build a combined map for replacing column references with literal values. // This includes: @@ -658,6 +735,103 @@ impl ParquetMorselizer { preserve_order: self.preserve_order, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, + shared_work_source, + }) + } + + /// Construct the state for a donated row-group chunk: the stealer + /// starts directly at [`ParquetOpenState::BuildStream`]. We skip the + /// entire pre-scan pipeline (file-level pruning, metadata load, + /// filter preparation, page-index load, stats / bloom / limit + /// pruning) by reusing the donor's already-computed state carried + /// on the `PartitionedFile`. + fn build_stealer_state( + &self, + partitioned_file: PartitionedFile, + chunk: ParquetOpenChunk, + ) -> Result { + let file_name = partitioned_file.object_meta.location.to_string(); + let file_metrics = + ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); + let baseline_metrics = BaselineMetrics::new(&self.metrics, self.partition_index); + let metadata_size_hint = partitioned_file + .metadata_size_hint + .or(self.metadata_size_hint); + let async_file_reader: Box = + self.parquet_file_reader_factory.create_reader( + self.partition_index, + partitioned_file.clone(), + metadata_size_hint, + &self.metrics, + )?; + let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .with_category(MetricCategory::Rows) + .global_counter("num_predicate_creation_errors"); + let logical_file_schema = Arc::clone(self.table_schema.file_schema()); + let output_schema = Arc::clone(&self.output_schema); + + let ParquetOpenChunk { + access_plan, + reader_metadata, + options, + physical_file_schema, + predicate, + projection, + pruning_predicate, + page_pruning_predicate, + row_filter_candidates, + read_plan, + } = chunk; + + let prepared = PreparedParquetOpen { + partition_index: self.partition_index, + partitioned_file, + file_range: None, + extensions: None, + file_name, + file_metrics, + baseline_metrics, + file_pruner: None, + metadata_size_hint, + metrics: self.metrics.clone(), + parquet_file_reader_factory: Arc::clone(&self.parquet_file_reader_factory), + async_file_reader, + batch_size: self.batch_size, + logical_file_schema, + physical_file_schema, + output_schema, + projection, + predicate, + reorder_predicates: self.reorder_filters, + pushdown_filters: self.pushdown_filters, + force_filter_selections: self.force_filter_selections, + enable_page_index: self.enable_page_index, + enable_bloom_filter: self.enable_bloom_filter, + enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, + limit: self.limit, + coerce_int96: self.coerce_int96, + expr_adapter_factory: Arc::clone(&self.expr_adapter_factory), + predicate_creation_errors, + max_predicate_cache_size: self.max_predicate_cache_size, + reverse_row_groups: self.reverse_row_groups, + preserve_order: self.preserve_order, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + shared_work_source: None, + }; + Ok(RowGroupsPrunedParquetOpen { + prepared: FiltersPreparedParquetOpen { + loaded: MetadataLoadedParquetOpen { + prepared, + reader_metadata, + options, + }, + pruning_predicate, + page_pruning_predicate, + row_filter_candidates, + read_plan, + }, + row_groups: RowGroupAccessPlanFilter::new(access_plan), }) } } @@ -839,6 +1013,38 @@ impl MetadataLoadedParquetOpen { None }; + // Build row-filter candidates once here — the expensive part of + // pushdown-filter construction (schema walk, ProjectionMask + // build, cost-based reorder). Stealers of donated chunks reuse + // this via `ParquetOpenChunk`. + let row_filter_candidates = if let Some(predicate) = prepared + .pushdown_filters + .then_some(prepared.predicate.as_ref()) + .flatten() + { + match build_row_filter_candidates( + predicate, + &physical_file_schema, + reader_metadata.metadata(), + prepared.reorder_predicates, + ) { + Ok(Some(cs)) => Some(Arc::new(cs)), + Ok(None) => None, + Err(e) => { + debug!("Ignoring error building row filter for '{predicate:?}': {e}"); + None + } + } + } else { + None + }; + + let read_plan = Arc::new(build_projection_read_plan( + prepared.projection.expr_iter(), + &physical_file_schema, + reader_metadata.parquet_schema(), + )); + Ok(FiltersPreparedParquetOpen { loaded: MetadataLoadedParquetOpen { prepared, @@ -847,6 +1053,8 @@ impl MetadataLoadedParquetOpen { }, pruning_predicate, page_pruning_predicate, + row_filter_candidates, + read_plan, }) } } @@ -1055,6 +1263,92 @@ impl BloomFiltersLoadedParquetOpen { } impl RowGroupsPrunedParquetOpen { + /// File-level LIMIT pruning + row-group donation. + /// + /// Runs after stats + bloom pruning so the access plan reflects every + /// pruning decision before `prune_by_limit` picks fully-matched row + /// groups (which requires the whole-file view). Once limit pruning is + /// done, the donor keeps the first surviving row group and pushes + /// each remaining one to the front of the shared queue as a + /// `PartitionedFile` clone whose `extensions` carry a + /// `ParquetOpenChunk` (finalized access plan + pre-loaded metadata). + /// Stealers pop a chunk and start at `BuildStream` directly (see + /// [`ParquetMorselizer::build_stealer_state`]); they never reach + /// this function. + /// + /// No-op when there are fewer than two row groups left in scope, no + /// shared queue is attached, or the caller supplied their own + /// `ParquetAccessPlan`. + fn split_and_donate(mut self) -> Result { + if let (Some(limit), false) = ( + self.prepared.loaded.prepared.limit, + self.prepared.loaded.prepared.preserve_order, + ) { + let rg_metadata = + self.prepared.loaded.reader_metadata.metadata().row_groups(); + self.row_groups.prune_by_limit( + limit, + rg_metadata, + &self.prepared.loaded.prepared.file_metrics, + ); + } + + let Some(shared) = self.prepared.loaded.prepared.shared_work_source.take() else { + return Ok(self); + }; + if let Some(ext) = self.prepared.loaded.prepared.extensions.as_ref() + && ext.is::() + { + return Ok(self); + } + + let eligible: Vec = self.row_groups.row_group_indexes().collect(); + if eligible.len() < 2 { + return Ok(self); + } + + let num_rgs = self + .prepared + .loaded + .reader_metadata + .metadata() + .num_row_groups(); + let single_rg_plan = |idx: usize| -> ParquetAccessPlan { + let mut plan = ParquetAccessPlan::new_none(num_rgs); + plan.scan(idx); + plan + }; + + let make_chunk = |idx: usize| ParquetOpenChunk { + access_plan: single_rg_plan(idx), + reader_metadata: self.prepared.loaded.reader_metadata.clone(), + options: self.prepared.loaded.options.clone(), + physical_file_schema: Arc::clone( + &self.prepared.loaded.prepared.physical_file_schema, + ), + predicate: self.prepared.loaded.prepared.predicate.clone(), + projection: self.prepared.loaded.prepared.projection.clone(), + pruning_predicate: self.prepared.pruning_predicate.clone(), + page_pruning_predicate: self.prepared.page_pruning_predicate.clone(), + row_filter_candidates: self.prepared.row_filter_candidates.clone(), + read_plan: Arc::clone(&self.prepared.read_plan), + }; + + let keep_idx = eligible[0]; + let donated_files: Vec = eligible[1..] + .iter() + .map(|&idx| { + let mut file = self.prepared.loaded.prepared.partitioned_file.clone(); + file.range = None; + file.extensions = Some(Arc::new(make_chunk(idx))); + file + }) + .collect(); + shared.push_morsels(donated_files); + self.row_groups = RowGroupAccessPlanFilter::new(single_rg_plan(keep_idx)); + Ok(self) + } + /// Build the final parquet stream once all pruning work is complete. fn build_stream(self) -> Result>> { let RowGroupsPrunedParquetOpen { @@ -1065,6 +1359,8 @@ impl RowGroupsPrunedParquetOpen { loaded, pruning_predicate: _, page_pruning_predicate, + row_filter_candidates, + read_plan, } = prepared; let MetadataLoadedParquetOpen { prepared, @@ -1075,25 +1371,16 @@ impl RowGroupsPrunedParquetOpen { let file_metadata = Arc::clone(reader_metadata.metadata()); let rg_metadata = file_metadata.row_groups(); - // Filter pushdown: evaluate predicates during scan - let row_filter = if let Some(predicate) = prepared - .pushdown_filters - .then_some(prepared.predicate.clone()) - .flatten() + // Filter pushdown: evaluate predicates during scan. The + // expensive candidate construction ran once in `prepare_filters`; + // here we only do the cheap per-open metric binding. + let row_filter = if let Some(candidates) = row_filter_candidates.as_deref() + && prepared.pushdown_filters { - let row_filter = row_filter::build_row_filter( - &predicate, - &prepared.physical_file_schema, - file_metadata.as_ref(), - prepared.reorder_predicates, - &prepared.file_metrics, - ); - - match row_filter { - Ok(Some(filter)) => Some(filter), - Ok(None) => None, + match row_filter_from_candidates(candidates, &prepared.file_metrics) { + Ok(filter) => Some(filter), Err(e) => { - debug!("Ignoring error building row filter for '{predicate:?}': {e}"); + debug!("Ignoring error building row filter: {e}"); None } } @@ -1133,15 +1420,10 @@ impl RowGroupsPrunedParquetOpen { } let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - let read_plan = build_projection_read_plan( - prepared.projection.expr_iter(), - &prepared.physical_file_schema, - reader_metadata.parquet_schema(), - ); let mut decoder_builder = ParquetPushDecoderBuilder::new_with_metadata(reader_metadata) - .with_projection(read_plan.projection_mask) + .with_projection(read_plan.projection_mask.clone()) .with_batch_size(prepared.batch_size) .with_metrics(arrow_reader_metrics.clone()); @@ -1174,7 +1456,7 @@ impl RowGroupsPrunedParquetOpen { // Check if we need to replace the schema to handle things like differing nullability or metadata. // See note below about file vs. output schema. - let stream_schema = read_plan.projected_schema; + let stream_schema = Arc::clone(&read_plan.projected_schema); let replace_schema = stream_schema != prepared.output_schema; // Rebase column indices to match the narrowed stream schema. @@ -1544,15 +1826,21 @@ fn create_initial_plan( row_group_count: usize, ) -> Result { if let Some(extensions) = extensions { - if let Some(access_plan) = extensions.downcast_ref::() { + let access_plan = + if let Some(plan) = extensions.downcast_ref::() { + Some(plan) + } else { + extensions + .downcast_ref::() + .map(|c| &c.access_plan) + }; + if let Some(access_plan) = access_plan { let plan_len = access_plan.len(); if plan_len != row_group_count { return exec_err!( "Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}" ); } - - // check row group count matches the plan return Ok(access_plan.clone()); } else { debug!("DataSourceExec Ignoring unknown extension specified for {file_name}"); @@ -1624,6 +1912,7 @@ async fn load_page_index( mod test { use super::*; use super::{ConstantColumns, ParquetMorselizer, constant_columns_from_stats}; + use crate::row_group_filter::row_group_start_offset; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess}; use arrow::array::RecordBatch; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -1676,6 +1965,7 @@ mod test { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + shared_work_source: Option, } impl ParquetMorselizerBuilder { @@ -1702,9 +1992,17 @@ mod test { max_predicate_cache_size: None, reverse_row_groups: false, preserve_order: false, + shared_work_source: None, } } + /// Attach a shared work source so the built morselizer can donate + /// row-group chunks to sibling streams. + fn with_shared_work_source(mut self, shared: SharedWorkSource) -> Self { + self.shared_work_source = Some(shared); + self + } + /// Set the object store (required for building). fn with_store(mut self, store: Arc) -> Self { self.store = Some(store); @@ -1789,6 +2087,11 @@ mod test { ProjectionExprs::from_indices(&all_indices, &file_schema) }; + let output_schema = Arc::new( + projection + .project_schema(table_schema.table_schema()) + .expect("project_schema"), + ); ParquetMorselizer { partition_index: self.partition_index, projection, @@ -1816,6 +2119,8 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + shared_work_source: self.shared_work_source, + output_schema, } } } @@ -2720,4 +3025,224 @@ mod test { "without page index all rows are returned" ); } + + /// Write a 4-row-group file (3 rows per row group) and return + /// `(store, schema, file, data_len)`. + async fn write_four_row_group_file() + -> (Arc, SchemaRef, PartitionedFile, usize) { + let store: Arc = Arc::new(InMemory::new()); + let batches = vec![ + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(), + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(), + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(), + record_batch!(("a", Int32, vec![Some(10), Some(11), Some(12)])).unwrap(), + ]; + let schema = batches[0].schema(); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "rg_split.parquet", + batches, + Some(props), + ) + .await; + let file = PartitionedFile::new( + "rg_split.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + (store, schema, file, data_len) + } + + /// Build a single-column morselizer, optionally attached to a shared + /// work source so it can donate. + fn split_test_morselizer( + store: &Arc, + schema: &SchemaRef, + shared: Option<&SharedWorkSource>, + ) -> ParquetMorselizer { + let mut b = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(store)) + .with_schema(Arc::clone(schema)) + .with_projection_indices(&[0]); + if let Some(shared) = shared { + b = b.with_shared_work_source(shared.clone()); + } + b.build() + } + + /// Donor keeps row group 0 and pushes N-1 donated chunks to the + /// shared queue. Donated chunks carry a one-byte `FileRange` and + /// no other magic — no `extensions` payload, no new types. + #[tokio::test] + async fn row_group_split_donates_remaining_row_groups() { + let (store, schema, file, _) = write_four_row_group_file().await; + let shared = SharedWorkSource::default(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file.clone()).await.unwrap(); + let donor_values = collect_int32_values(stream).await; + assert_eq!( + donor_values, + vec![1, 2, 3], + "donor should read only row group 0" + ); + + // Pop donated chunks off the shared queue and read each. + let mut stolen: Vec> = Vec::new(); + while let Some(donated) = shared.pop_front() { + assert!( + donated.range.is_none(), + "donated chunk should not use byte range — the access plan specifies the row group" + ); + assert!( + donated + .extensions + .as_ref() + .is_some_and(|ext| ext.is::()), + "donated chunk must carry ParquetOpenChunk" + ); + let stealer = split_test_morselizer(&store, &schema, None); + let stream = open_file(&stealer, donated).await.unwrap(); + stolen.push(collect_int32_values(stream).await); + } + assert_eq!( + stolen, + vec![vec![4, 5, 6], vec![7, 8, 9], vec![10, 11, 12]], + "each stealer should get exactly one row group, in file order" + ); + } + + /// A single-row-group file has nothing to donate. + #[tokio::test] + async fn row_group_split_skips_single_row_group_file() { + let store: Arc = Arc::new(InMemory::new()); + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let data_len = + write_parquet(Arc::clone(&store), "single.parquet", batch.clone()).await; + let file = PartitionedFile::new( + "single.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + let shared = SharedWorkSource::default(); + let schema = batch.schema(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file).await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!(values, vec![1, 2, 3]); + assert!( + shared.pop_front().is_none(), + "single-row-group file must not donate" + ); + } + + /// A caller-supplied `ParquetAccessPlan` in `extensions` is respected + /// as-is — no donation happens even if the file has many row groups. + #[tokio::test] + async fn row_group_split_respects_caller_access_plan() { + let (store, schema, file, _) = write_four_row_group_file().await; + let mut caller_plan = ParquetAccessPlan::new_all(4); + caller_plan.skip(0); + caller_plan.skip(2); + let file = file.with_extensions(Arc::new(caller_plan)); + let shared = SharedWorkSource::default(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file).await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![4, 5, 6, 10, 11, 12], + "caller plan scans RGs 1 and 3, skipping 0 and 2" + ); + assert!( + shared.pop_front().is_none(), + "caller-supplied access plan must suppress donation" + ); + } + + /// A caller-supplied `file_range` that spans several row groups is + /// still split: donated chunks' narrow ranges are subsets of the + /// caller's range, so caller intent (byte-range partitioning of the + /// file across planner-level partitions) is preserved. + #[tokio::test] + async fn row_group_split_within_caller_file_range() { + let (store, schema, file, data_len) = write_four_row_group_file().await; + let file = PartitionedFile { + range: Some(datafusion_datasource::FileRange { + start: 0, + end: data_len as i64, + }), + ..file + }; + let shared = SharedWorkSource::default(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file).await.unwrap(); + let donor_values = collect_int32_values(stream).await; + assert_eq!(donor_values, vec![1, 2, 3]); + + let mut donated_count = 0; + let mut all_stolen: Vec = Vec::new(); + while let Some(donated) = shared.pop_front() { + assert!( + donated + .extensions + .as_ref() + .is_some_and(|ext| ext.is::()), + "donated chunk must carry ParquetOpenChunk" + ); + donated_count += 1; + let stealer = split_test_morselizer(&store, &schema, None); + let stream = open_file(&stealer, donated).await.unwrap(); + all_stolen.extend(collect_int32_values(stream).await); + } + assert_eq!(donated_count, 3); + assert_eq!(all_stolen, vec![4, 5, 6, 7, 8, 9, 10, 11, 12]); + } + + /// A caller-supplied `file_range` that contains only a single row + /// group has nothing to split — no donation should happen. + #[tokio::test] + async fn row_group_split_skips_when_caller_range_covers_single_row_group() { + let (store, schema, file, _) = write_four_row_group_file().await; + // Read metadata to locate row group 1's offset, then make a + // caller range that contains only row group 1. + let reader: Box = + DefaultParquetFileReaderFactory::new(Arc::clone(&store)) + .create_reader(0, file.clone(), None, &ExecutionPlanMetricsSet::new()) + .unwrap(); + let md = ArrowReaderMetadata::load_async( + &mut { reader }, + ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip), + ) + .await + .unwrap(); + let rg1_offset = row_group_start_offset(md.metadata().row_group(1)); + + let file = PartitionedFile { + range: Some(datafusion_datasource::FileRange { + start: rg1_offset, + end: rg1_offset + 1, + }), + ..file + }; + let shared = SharedWorkSource::default(); + + let morselizer = split_test_morselizer(&store, &schema, Some(&shared)); + + let stream = open_file(&morselizer, file).await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!(values, vec![4, 5, 6], "caller range isolates row group 1"); + assert!( + shared.pop_front().is_none(), + "single-row-group caller range must suppress donation" + ); + } } diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index c5c372055826b..0478269db1bec 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -177,6 +177,7 @@ impl ArrowPredicate for DatafusionArrowPredicate { /// of evaluating the resulting expression. /// /// See the module level documentation for more information. +#[derive(Clone)] pub(crate) struct FilterCandidate { expr: Arc, /// Estimate for the total number of bytes that will need to be processed @@ -1019,10 +1020,28 @@ pub fn build_row_filter( reorder_predicates: bool, file_metrics: &ParquetFileMetrics, ) -> Result> { - let rows_pruned = &file_metrics.pushdown_rows_pruned; - let rows_matched = &file_metrics.pushdown_rows_matched; - let time = &file_metrics.row_pushdown_eval_time; + let Some(candidates) = + build_row_filter_candidates(expr, file_schema, metadata, reorder_predicates)? + else { + return Ok(None); + }; + row_filter_from_candidates(&candidates, file_metrics).map(Some) +} +/// Expensive, metrics-free first phase of row-filter construction. +/// +/// Splits the predicate into conjuncts, resolves each one against the file +/// schema and parquet metadata (building [`ProjectionMask`]s and cost +/// estimates), and optionally reorders candidates by estimated cost. The +/// result is the same for every open of a given (predicate, file_schema, +/// file_metadata) triple, so a donor may build it once and share it with +/// sibling stealers. +pub(crate) fn build_row_filter_candidates( + expr: &Arc, + file_schema: &SchemaRef, + metadata: &ParquetMetaData, + reorder_predicates: bool, +) -> Result>> { // Split into conjuncts: // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] let predicates = split_conjunction(expr); @@ -1039,7 +1058,6 @@ pub fn build_row_filter( .flatten() .collect(); - // no candidates if candidates.is_empty() { return Ok(None); } @@ -1047,6 +1065,22 @@ pub fn build_row_filter( if reorder_predicates { candidates.sort_unstable_by_key(|c| c.required_bytes); } + Ok(Some(candidates)) +} + +/// Cheap per-open second phase: wire each candidate up with the current +/// file's row-filter metrics. +/// +/// Called separately from [`build_row_filter_candidates`] so that the +/// expensive candidate construction can be shared across sibling opens +/// of the same file. +pub(crate) fn row_filter_from_candidates( + candidates: &[FilterCandidate], + file_metrics: &ParquetFileMetrics, +) -> Result { + let rows_pruned = &file_metrics.pushdown_rows_pruned; + let rows_matched = &file_metrics.pushdown_rows_matched; + let time = &file_metrics.row_pushdown_eval_time; // To avoid double-counting metrics when multiple predicates are used: // - All predicates should count rows_pruned (cumulative pruned rows) @@ -1055,23 +1089,18 @@ pub fn build_row_filter( let total_candidates = candidates.len(); candidates - .into_iter() + .iter() .enumerate() .map(|(idx, candidate)| { let is_last = idx == total_candidates - 1; - - // All predicates share the pruned counter (cumulative) let predicate_rows_pruned = rows_pruned.clone(); - - // Only the last predicate tracks matched rows (final result) let predicate_rows_matched = if is_last { rows_matched.clone() } else { metrics::Count::new() }; - DatafusionArrowPredicate::try_new( - candidate, + candidate.clone(), predicate_rows_pruned, predicate_rows_matched, time.clone(), @@ -1079,7 +1108,7 @@ pub fn build_row_filter( .map(|pred| Box::new(pred) as _) }) .collect::, _>>() - .map(|filters| Some(RowFilter::new(filters))) + .map(RowFilter::new) } #[cfg(test)] diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 3f254c9f55282..3c4961c57858f 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -33,6 +33,17 @@ use parquet::data_type::Decimal; use parquet::schema::types::SchemaDescriptor; use parquet::{bloom_filter::Sbbf, file::metadata::RowGroupMetaData}; +/// Starting byte offset of a row group in its parquet file. +/// +/// Uses the first column's dictionary page offset when present, otherwise its +/// data page offset — intentionally *not* the metadata location, per +/// . +pub fn row_group_start_offset(metadata: &RowGroupMetaData) -> i64 { + let col = metadata.column(0); + col.dictionary_page_offset() + .unwrap_or_else(|| col.data_page_offset()) +} + /// Reduces the [`ParquetAccessPlan`] based on row group level metadata. /// /// This struct implements the various types of pruning that are applied to a @@ -224,17 +235,7 @@ impl RowGroupAccessPlanFilter { if !self.access_plan.should_scan(idx) { continue; } - - // Skip the row group if the first dictionary/data page are not - // within the range. - // - // note don't use the location of metadata - // - let col = metadata.column(0); - let offset = col - .dictionary_page_offset() - .unwrap_or_else(|| col.data_page_offset()); - if !range.contains(offset) { + if !range.contains(row_group_start_offset(metadata)) { self.access_plan.skip(idx); } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..2a9332df8b644 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -29,7 +29,7 @@ use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; -use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::file_stream::{FileOpener, SharedWorkSource}; use datafusion_datasource::morsel::Morselizer; use arrow::datatypes::TimeUnit; @@ -526,6 +526,7 @@ impl FileSource for ParquetSource { object_store: Arc, base_config: &FileScanConfig, partition: usize, + shared_work_source: Option, ) -> datafusion_common::Result> { let expr_adapter_factory = base_config .expr_adapter_factory @@ -553,6 +554,10 @@ impl FileSource for ParquetSource { .as_ref() .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); + let output_schema = Arc::new( + self.projection + .project_schema(self.table_schema.table_schema())?, + ); Ok(Box::new(ParquetMorselizer { partition_index: partition, projection: self.projection.clone(), @@ -580,6 +585,8 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + shared_work_source, + output_schema, })) } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 9b4ae5827ae8b..d8ecad3db1a11 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; -use crate::file_stream::FileOpener; +use crate::file_stream::{FileOpener, SharedWorkSource}; use crate::morsel::{FileOpenerMorselizer, Morselizer}; #[expect(deprecated)] use crate::schema_adapter::SchemaAdapterFactory; @@ -82,11 +82,18 @@ pub trait FileSource: Any + Send + Sync { /// /// It is preferred to implement the [`Morselizer`] API directly by /// implementing this method. + /// + /// `shared_work_source`, when `Some`, is the queue of unopened files + /// shared across sibling streams. File sources that can sub-divide a + /// single file into smaller stealable work units (e.g. parquet row-group + /// splitting) may push donated chunks onto it; sources that cannot simply + /// ignore the parameter. fn create_morselizer( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, + _shared_work_source: Option, ) -> Result> { let opener = self.create_file_opener(object_store, base_config, partition)?; Ok(Box::new(FileOpenerMorselizer::new(opener))) diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 04b74528d5ac1..aeed3c16670df 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -597,8 +597,6 @@ impl DataSource for FileScanConfig { let source = self.file_source.with_batch_size(batch_size); - let morselizer = source.create_morselizer(object_store, self, partition)?; - // Extract the shared work source from the sibling state if it exists. // This allows multiple sibling streams to steal work from a single // shared queue of unopened files. @@ -607,6 +605,13 @@ impl DataSource for FileScanConfig { .and_then(|state| state.downcast_ref::()) .cloned(); + let morselizer = source.create_morselizer( + object_store, + self, + partition, + shared_work_source.clone(), + )?; + let stream = FileStreamBuilder::new(self) .with_partition(partition) .with_shared_work_source(shared_work_source) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index e277690cff810..596c8a78099cb 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -47,6 +47,7 @@ use self::scan_state::{ScanAndReturn, ScanState}; pub use builder::FileStreamBuilder; pub use metrics::{FileStreamMetrics, StartableTime}; +pub use work_source::SharedWorkSource; /// A stream that iterates record batch by record batch, file over file. pub struct FileStream { diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 21125cd08896c..2f53a3dedfc2c 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -26,7 +26,7 @@ use datafusion_physical_plan::metrics::ScopedTimerGuard; use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt as _}; -use super::work_source::WorkSource; +use super::work_source::{FileLease, PopResult, WorkSource}; use super::{FileStreamMetrics, OnError}; /// State [`FileStreamState::Scan`]. @@ -81,6 +81,12 @@ pub(super) struct ScanState { /// Once the I/O completes, yields the next planner and is pushed back /// onto `ready_planners`. pending_planner: Option, + /// Lease on the current file popped from a shared work source. While + /// held, idle siblings will wait for potential donations from this + /// file instead of declaring the shared source drained. `None` for + /// files that came from a local work source or for pre-finalized + /// morsels. + current_file_lease: Option, /// Metrics for the active scan queues. metrics: FileStreamMetrics, } @@ -102,6 +108,7 @@ impl ScanState { ready_morsels: Default::default(), reader: None, pending_planner: None, + current_file_lease: None, metrics, } } @@ -146,6 +153,7 @@ impl ScanState { return match self.on_error { OnError::Skip => { self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), @@ -174,6 +182,7 @@ impl ScanState { let batch = batch.slice(0, *remain); let done = 1 + self.work_source.skipped_on_limit(); self.metrics.files_processed.add(done); + self.current_file_lease = None; *remain = 0; (batch, true) } @@ -197,6 +206,7 @@ impl ScanState { return match self.on_error { OnError::Skip => { self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), @@ -205,6 +215,7 @@ impl ScanState { Poll::Ready(None) => { self.reader = None; self.metrics.files_processed.add(1); + self.current_file_lease = None; self.metrics.time_scanning_until_data.stop(); self.metrics.time_scanning_total.stop(); return ScanAndReturn::Continue; @@ -218,6 +229,11 @@ impl ScanState { self.metrics.time_scanning_until_data.start(); self.metrics.time_scanning_total.start(); self.reader = Some(morsel.into_stream()); + // A morsel is now streaming, so we're past the pre-scan window + // where donations happen. Release the in-flight donor slot so + // idle siblings can make progress (or exit) without waiting on + // this stream to finish its assigned row groups. + self.current_file_lease = None; return ScanAndReturn::Continue; } @@ -248,6 +264,7 @@ impl ScanState { } Ok(None) => { self.metrics.files_processed.add(1); + self.current_file_lease = None; self.metrics.time_opening.stop(); ScanAndReturn::Continue } @@ -257,6 +274,7 @@ impl ScanState { match self.on_error { OnError::Skip => { self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), @@ -266,10 +284,18 @@ impl ScanState { } // No outstanding work remains, so begin planning the next unopened file. - let part_file = match self.work_source.pop_front() { - Some(part_file) => part_file, - None => return ScanAndReturn::Done(None), + let (part_file, lease) = match self.work_source.pop_front() { + PopResult::Ready(file, lease) => (file, lease), + PopResult::Pending => { + // A sibling is pre-scan on a shared file that may still + // donate morsels. Re-schedule ourselves so we re-check the + // queues as soon as the scheduler picks us up. + cx.waker().wake_by_ref(); + return ScanAndReturn::Return(Poll::Pending); + } + PopResult::Done => return ScanAndReturn::Done(None), }; + self.current_file_lease = lease; self.metrics.time_opening.start(); match self.morselizer.plan_file(part_file) { @@ -283,6 +309,7 @@ impl ScanState { self.metrics.file_open_errors.add(1); self.metrics.time_opening.stop(); self.metrics.files_processed.add(1); + self.current_file_lease = None; ScanAndReturn::Continue } OnError::Fail => ScanAndReturn::Error(err), diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index 7f31dacca9592..85ee9d1f0727d 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -17,6 +17,7 @@ use std::collections::VecDeque; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use crate::PartitionedFile; use crate::file_groups::FileGroup; @@ -37,11 +38,18 @@ pub(super) enum WorkSource { } impl WorkSource { - /// Pop the next file to plan from this work source. - pub(super) fn pop_front(&mut self) -> Option { + /// Try to pop the next item of work. + /// + /// Returns [`PopResult::Pending`] for [`WorkSource::Shared`] when both + /// queues are empty but a sibling is still processing a file that may + /// donate more work. The caller must yield with its waker re-scheduled. + pub(super) fn pop_front(&mut self) -> PopResult { match self { - Self::Local(files) => files.pop_front(), - Self::Shared(shared) => shared.pop_front(), + Self::Local(files) => match files.pop_front() { + Some(file) => PopResult::Ready(file, None), + None => PopResult::Done, + }, + Self::Shared(shared) => shared.pop_front_tracked(), } } @@ -55,31 +63,74 @@ impl WorkSource { } } -/// Shared source of work for sibling `FileStream`s +/// Outcome of a pop attempt against a [`WorkSource`]. +#[derive(Debug)] +#[expect( + clippy::large_enum_variant, + reason = "Ready carries a PartitionedFile on the common path; boxing would add a heap alloc per pop and the other variants are markers" +)] +pub(super) enum PopResult { + /// Work popped. The optional [`FileLease`] must be held until the file + /// is fully processed; while it is alive, idle siblings treat the + /// shared source as "donor may still publish" instead of drained. + Ready(PartitionedFile, Option), + /// No work currently available, but a sibling is still pre-scan on a + /// file that may donate. The caller must register its waker to be + /// re-polled and yield [`Poll::Pending`]. + Pending, + /// No work available and no donors in flight — fully drained. + Done, +} + +/// Shared source of work for sibling `FileStream`s. +/// +/// Created once per execution and shared by all reorderable sibling streams. +/// Holds two queues: /// -/// The queue is created once per execution and shared by all reorderable -/// sibling streams for that execution. Whichever stream becomes idle first may -/// take the next unopened file from the front of the queue. +/// - **morsels**: pre-prepared sub-file work items (e.g. parquet row-group +/// chunks donated mid-open by a sibling). Always popped first. +/// - **files**: whole unopened files — the initial scan units. /// -/// It uses a [`Mutex`] internally to provide thread-safe access -/// to the shared file queue. +/// A FileStream that picks up a morsel has finalized state attached to it +/// (via `PartitionedFile::extensions`) and can skip most of the per-file +/// state machine. Draining morsels first keeps their latency low and +/// prevents siblings from starting fresh whole files while half-processed +/// sub-file work sits idle. +/// +/// Also tracks an in-flight donor count: every file popped from `files` is +/// backed by a [`FileLease`] whose `Drop` decrements the count. While the +/// count is non-zero, an idle sibling that sees both queues empty must wait +/// rather than declare the source drained — the donor is still pre-scan +/// and may yet push morsels. #[derive(Debug, Clone)] -pub(crate) struct SharedWorkSource { +pub struct SharedWorkSource { inner: Arc, } #[derive(Debug, Default)] pub(super) struct SharedWorkSourceInner { + morsels: Mutex>, files: Mutex>, + /// Number of files popped from `files` whose [`FileLease`] has not yet + /// been dropped. Non-zero means "donor may still publish morsels." + in_flight: AtomicUsize, +} + +impl Default for SharedWorkSource { + fn default() -> Self { + Self::new(std::iter::empty()) + } } impl SharedWorkSource { /// Create a shared work source containing the provided unopened files. pub(crate) fn new(files: impl IntoIterator) -> Self { - let files = files.into_iter().collect(); + let files: VecDeque = files.into_iter().collect(); Self { inner: Arc::new(SharedWorkSourceInner { + morsels: Mutex::new(VecDeque::new()), files: Mutex::new(files), + in_flight: AtomicUsize::new(0), }), } } @@ -89,10 +140,84 @@ impl SharedWorkSource { Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned()) } - /// Pop the next file from the shared work queue. + /// Pop the next item of work — morsels (pre-prepared sub-file chunks) + /// first, then whole files. /// - /// Returns `None` if the queue is empty - fn pop_front(&self) -> Option { + /// Returns `None` if both queues are empty. Does *not* track in-flight + /// donors; intended for tests and callers that only observe morsel + /// donations. `ScanState` uses [`Self::pop_front_tracked`]. + pub fn pop_front(&self) -> Option { + if let Some(morsel) = self.inner.morsels.lock().pop_front() { + return Some(morsel); + } self.inner.files.lock().pop_front() } + + /// Pop the next item of work for a sibling `FileStream`, returning a + /// [`PopResult`] that distinguishes "nothing right now but donors may + /// publish" from "truly drained." + pub(super) fn pop_front_tracked(&self) -> PopResult { + if let Some(morsel) = self.inner.morsels.lock().pop_front() { + return PopResult::Ready(morsel, None); + } + // Increment before releasing the `files` lock so a concurrent peer + // cannot observe empty-files && zero-counter while this donor is + // about to register itself. + let mut files = self.inner.files.lock(); + if let Some(file) = files.pop_front() { + self.inner.in_flight.fetch_add(1, Ordering::Release); + drop(files); + return PopResult::Ready( + file, + Some(FileLease { + inner: Arc::clone(&self.inner), + }), + ); + } + drop(files); + // Both queues empty. If any donor is still in flight, wait — it may + // yet donate morsels. + if self.inner.in_flight.load(Ordering::Acquire) > 0 { + return PopResult::Pending; + } + // Counter observed zero. A donor that donated before dropping its + // lease must have pushed morsels before the decrement; re-peek the + // morsel queue to pick them up rather than exit prematurely. + if let Some(morsel) = self.inner.morsels.lock().pop_front() { + return PopResult::Ready(morsel, None); + } + PopResult::Done + } + + /// Push pre-prepared morsels onto the morsel queue. + /// + /// Used when an in-flight file is sub-divided (e.g. parquet row-group + /// splitting): each donated chunk carries its finalized state via + /// `PartitionedFile::extensions` so the stealer can skip most of the + /// per-file state machine. Items preserve their order. + pub fn push_morsels(&self, items: impl IntoIterator) { + let mut queue = self.inner.morsels.lock(); + queue.extend(items); + } +} + +/// RAII guard tracking a file popped from a [`SharedWorkSource`]'s `files` +/// queue. While alive, the counter on the source stays non-zero, which +/// keeps idle sibling streams waiting for potential donations instead of +/// declaring the source drained. Dropped when the donor finishes (or +/// gives up on) the file. +pub(super) struct FileLease { + inner: Arc, +} + +impl std::fmt::Debug for FileLease { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileLease").finish_non_exhaustive() + } +} + +impl Drop for FileLease { + fn drop(&mut self) { + self.inner.in_flight.fetch_sub(1, Ordering::Release); + } }