Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ pub(crate) mod test_util {

use crate::test::object_store::local_unpartitioned_file;

/// Writes `batches` to a temporary parquet file
/// Writes each `batch` to at least one temporary parquet file
///
/// For example, if `batches` contains 2 batches, the function will create
/// 2 temporary files, each containing the contents of one batch
///
/// If multi_page is set to `true`, the parquet file(s) are written
/// with 2 rows per data page (used to test page filtering and
Expand All @@ -52,7 +55,7 @@ pub(crate) mod test_util {
}
}

// we need the tmp files to be sorted as some tests rely on the how the returning files are ordered
// we need the tmp files to be sorted as some tests rely on the returned file ordering
// https://github.com/apache/datafusion/pull/6629
let tmp_files = {
let mut tmp_files: Vec<_> = (0..batches.len())
Expand Down
74 changes: 44 additions & 30 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ mod tests {
#[derive(Debug, Default)]
struct RoundTrip {
projection: Option<Vec<usize>>,
schema: Option<SchemaRef>,
/// Optional logical table schema to use when reading the parquet files
///
/// If None, the logical schema to use will be inferred from the
/// original data via [`Schema::try_merge`]
table_schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
page_index_predicate: bool,
Expand All @@ -113,8 +117,11 @@ mod tests {
self
}

fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
/// Specify table schema.
///
///See [`Self::table_schema`] for more details
fn with_table_schema(mut self, schema: SchemaRef) -> Self {
self.table_schema = Some(schema);
self
}

Expand Down Expand Up @@ -146,12 +153,12 @@ mod tests {
self.round_trip(batches).await.batches
}

fn build_file_source(&self, file_schema: SchemaRef) -> Arc<dyn FileSource> {
fn build_file_source(&self, table_schema: SchemaRef) -> Arc<dyn FileSource> {
// set up predicate (this is normally done by a layer higher up)
let predicate = self
.predicate
.as_ref()
.map(|p| logical2physical(p, &file_schema));
.map(|p| logical2physical(p, &table_schema));

let mut source = ParquetSource::default();
if let Some(predicate) = predicate {
Expand All @@ -178,7 +185,7 @@ mod tests {
source = source.with_bloom_filter_on_read(false);
}

source.with_schema(Arc::clone(&file_schema))
source.with_schema(Arc::clone(&table_schema))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 , I didn't notice here before

pub struct ParquetSource {
    /// Options for reading Parquet files
    pub(crate) table_parquet_options: TableParquetOptions,
    /// Optional metrics
    pub(crate) metrics: ExecutionPlanMetricsSet,
    /// The schema of the file.
    /// In particular, this is the schema of the table without partition columns,
    /// *not* the physical schema of the file.
    pub(crate) file_schema: Option<SchemaRef>,

Maybe we can change the file_schema name to a clearer name later.

}

fn build_parquet_exec(
Expand All @@ -199,8 +206,14 @@ mod tests {
}

/// run the test, returning the `RoundTripResult`
///
/// Each input batch is written into one or more parquet files (and thus
/// they could potentially have different schemas). The resulting
/// parquet files are then read back and filters are applied to the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// parquet files are then read back and filters are applied to the
/// parquet files are then read back and filters are applied to them

async fn round_trip(&self, batches: Vec<RecordBatch>) -> RoundTripResult {
let file_schema = match &self.schema {
// If table_schema is not set, we need to merge the schema of the
// input batches to get a unified schema.
let table_schema = match &self.table_schema {
Some(schema) => schema,
None => &Arc::new(
Schema::try_merge(
Expand All @@ -209,17 +222,16 @@ mod tests {
.unwrap(),
),
};
let file_schema = Arc::clone(file_schema);
// If testing with page_index_predicate, write parquet
// files with multiple pages
let multi_page = self.page_index_predicate;
let (meta, _files) = store_parquet(batches, multi_page).await.unwrap();
let file_group: FileGroup = meta.into_iter().map(Into::into).collect();

// build a ParquetExec to return the results
let parquet_source = self.build_file_source(file_schema.clone());
let parquet_source = self.build_file_source(Arc::clone(table_schema));
let parquet_exec = self.build_parquet_exec(
file_schema.clone(),
Arc::clone(table_schema),
file_group.clone(),
Arc::clone(&parquet_source),
);
Expand All @@ -229,9 +241,9 @@ mod tests {
false,
// use a new ParquetSource to avoid sharing execution metrics
self.build_parquet_exec(
file_schema.clone(),
Arc::clone(table_schema),
file_group.clone(),
self.build_file_source(file_schema.clone()),
self.build_file_source(Arc::clone(table_schema)),
),
Arc::new(Schema::new(vec![
Field::new("plan_type", DataType::Utf8, true),
Expand Down Expand Up @@ -304,7 +316,7 @@ mod tests {
// Thus this predicate will come back as false.
let filter = col("c2").eq(lit(1_i32));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_table_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
Expand All @@ -323,7 +335,7 @@ mod tests {
// If we excplicitly allow nulls the rest of the predicate should work
let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_table_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
Expand Down Expand Up @@ -362,7 +374,7 @@ mod tests {
// Thus this predicate will come back as false.
let filter = col("c2").eq(lit("abc"));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_table_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
Expand All @@ -381,7 +393,7 @@ mod tests {
// If we excplicitly allow nulls the rest of the predicate should work
let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_table_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
Expand Down Expand Up @@ -424,7 +436,7 @@ mod tests {
// Thus this predicate will come back as false.
let filter = col("c2").eq(lit("abc"));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_table_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
Expand All @@ -443,7 +455,7 @@ mod tests {
// If we excplicitly allow nulls the rest of the predicate should work
let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_table_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
Expand Down Expand Up @@ -486,7 +498,7 @@ mod tests {
// Thus this predicate will come back as false.
let filter = col("c2").eq(lit("abc"));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_table_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
Expand All @@ -505,7 +517,7 @@ mod tests {
// If we excplicitly allow nulls the rest of the predicate should work
let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32)));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_table_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
Expand Down Expand Up @@ -553,7 +565,7 @@ mod tests {
.and(col("c3").eq(lit(10_i32)).or(col("c2").is_null()));

let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_table_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
Expand Down Expand Up @@ -583,7 +595,7 @@ mod tests {
.or(col("c3").gt(lit(20_i32)).and(col("c2").is_null()));

let rt = RoundTrip::new()
.with_schema(table_schema)
.with_table_schema(table_schema)
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch])
Expand Down Expand Up @@ -871,13 +883,15 @@ mod tests {
Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")]));
let batch = create_batch(vec![("c1", c1.clone())]);

let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)]));
// Table schema is Utf8 but file schema is StringView
let table_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)]));

// Predicate should prune all row groups
let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string()))));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_schema(schema.clone())
.with_table_schema(table_schema.clone())
.round_trip(vec![batch.clone()])
.await;
// There should be no predicate evaluation errors
Expand All @@ -890,7 +904,7 @@ mod tests {
let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string()))));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_schema(schema)
.with_table_schema(table_schema)
.round_trip(vec![batch])
.await;
// There should be no predicate evaluation errors
Expand All @@ -912,14 +926,14 @@ mod tests {
let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)]));
let batch = create_batch(vec![("c1", c1.clone())]);

let schema =
let table_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)]));

// Predicate should prune all row groups
let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5))));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_schema(schema.clone())
.with_table_schema(table_schema.clone())
.round_trip(vec![batch.clone()])
.await;
// There should be no predicate evaluation errors
Expand All @@ -931,7 +945,7 @@ mod tests {
let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1))));
let rt = RoundTrip::new()
.with_predicate(filter)
.with_schema(schema)
.with_table_schema(table_schema)
.round_trip(vec![batch])
.await;
// There should be no predicate evaluation errors
Expand Down Expand Up @@ -1183,15 +1197,15 @@ mod tests {
// batch2: c3(int8), c2(int64), c1(string), c4(string)
let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);

let schema = Schema::new(vec![
let table_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
Field::new("c3", DataType::Int8, true),
]);

// read/write them files:
let read = RoundTrip::new()
.with_schema(Arc::new(schema))
.with_table_schema(Arc::new(table_schema))
.round_trip_to_batches(vec![batch1, batch2])
.await;
assert_contains!(read.unwrap_err().to_string(),
Expand Down
10 changes: 8 additions & 2 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,15 @@ pub struct FileScanConfig {
pub struct FileScanConfigBuilder {
object_store_url: ObjectStoreUrl,
/// Table schema before any projections or partition columns are applied.
/// This schema is used to read the files, but is **not** necessarily the schema of the physical files.
/// Rather this is the schema that the physical file schema will be mapped onto, and the schema that the
///
/// This schema is used to read the files, but is **not** necessarily the
/// schema of the physical files. Rather this is the schema that the
/// physical file schema will be mapped onto, and the schema that the
/// [`DataSourceExec`] will return.
///
/// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns.
///
/// This probably would be better named `table_schema`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, yes!!

file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,

Expand Down