Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,15 +746,7 @@ impl TableProvider for ListingTable {
.options
.table_partition_cols
.iter()
.map(|col| {
Ok((
col.0.to_owned(),
self.table_schema
.field_with_name(&col.0)?
.data_type()
.clone(),
))
})
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

let filters = if let Some(expr) = conjunction(filters.to_vec()) {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ mod tests {
statistics: Statistics::new_unknown(&file_schema),
file_schema,
limit: None,
table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
table_partition_cols: vec![Field::new("date", DataType::Utf8, false)],
output_ordering: vec![],
infinite_source: false,
});
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups)?;

// Add partition columns
config.table_partition_cols = vec![("date".to_owned(), DataType::Utf8)];
config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)];
config.file_groups[0][0].partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];

Expand Down
54 changes: 45 additions & 9 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct FileScanConfig {
/// all records after filtering are returned.
pub limit: Option<usize>,
/// The partitioning columns
pub table_partition_cols: Vec<(String, DataType)>,
pub table_partition_cols: Vec<Field>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the key change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to use FieldRef not Field

/// All equivalent lexicographical orderings that describe the schema.
pub output_ordering: Vec<LexOrdering>,
/// Indicates whether this plan may produce an infinite stream of records.
Expand Down Expand Up @@ -135,8 +135,7 @@ impl FileScanConfig {
table_cols_stats.push(self.statistics.column_statistics[idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
let (name, dtype) = &self.table_partition_cols[partition_idx];
table_fields.push(Field::new(name, dtype.to_owned(), false));
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this where we convert table_partition_cols to Field

// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::new_unknown())
}
Expand Down Expand Up @@ -501,10 +500,10 @@ mod tests {
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
vec![(
to_partition_cols(vec![(
"date".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
)],
)]),
);

let (proj_schema, proj_statistics, _) = conf.project();
Expand All @@ -527,6 +526,35 @@ mod tests {
assert_eq!(col_indices, None);
}

#[test]
fn physical_plan_config_no_projection_tab_cols_as_field() {
let file_schema = aggr_test_schema();

// make a table_partition_col as a field
let table_partition_col =
Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
.with_metadata(HashMap::from_iter(vec![(
"key_whatever".to_owned(),
"value_whatever".to_owned(),
)]));

let conf = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
vec![table_partition_col.clone()],
);

// verify the proj_schema inlcudes the last column and exactly the same the field it is defined
let (proj_schema, _proj_statistics, _) = conf.project();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
*proj_schema.field(file_schema.fields().len()),
table_partition_col,
"partition columns are the last columns and ust have all values defined in created field"
);
}

#[test]
fn physical_plan_config_with_projection() {
let file_schema = aggr_test_schema();
Expand All @@ -545,10 +573,10 @@ mod tests {
.collect(),
total_byte_size: Precision::Absent,
},
vec![(
to_partition_cols(vec![(
"date".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
)],
)]),
);

let (proj_schema, proj_statistics, _) = conf.project();
Expand Down Expand Up @@ -602,7 +630,7 @@ mod tests {
file_batch.schema().fields().len() + 2,
]),
Statistics::new_unknown(&file_batch.schema()),
partition_cols.clone(),
to_partition_cols(partition_cols.clone()),
);
let (proj_schema, ..) = conf.project();
// created a projector for that projected schema
Expand Down Expand Up @@ -747,7 +775,7 @@ mod tests {
file_schema: SchemaRef,
projection: Option<Vec<usize>>,
statistics: Statistics,
table_partition_cols: Vec<(String, DataType)>,
table_partition_cols: Vec<Field>,
) -> FileScanConfig {
FileScanConfig {
file_schema,
Expand All @@ -762,6 +790,14 @@ mod tests {
}
}

/// Convert partition columns from Vec<String DataType> to Vec<Field>
fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
table_partition_cols
.iter()
.map(|(name, dtype)| Field::new(name, dtype.clone(), false))
.collect::<Vec<_>>()
}

/// returns record batch with 3 columns of i32 in memory
pub fn build_table_i32(
a: (&str, &Vec<i32>),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl<F: FileOpener> FileStream<F> {
&config
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.map(|x| x.name().clone())
.collect::<Vec<_>>(),
);

Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1624,14 +1624,15 @@ mod tests {
projection: Some(vec![0, 1, 2, 12, 13]),
limit: None,
table_partition_cols: vec![
("year".to_owned(), DataType::Utf8),
("month".to_owned(), DataType::UInt8),
(
"day".to_owned(),
Field::new("year", DataType::Utf8, false),
Field::new("month", DataType::UInt8, false),
Field::new(
"day",
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
false,
),
],
output_ordering: vec![],
Expand Down
10 changes: 2 additions & 8 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::ops::Deref;
use std::sync::Arc;

use arrow::compute::SortOptions;
use arrow::datatypes::DataType;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
Expand Down Expand Up @@ -489,13 +488,8 @@ pub fn parse_protobuf_file_scan_config(
let table_partition_cols = proto
.table_partition_cols
.iter()
.map(|col| {
Ok((
col.to_owned(),
schema.field_with_name(col)?.data_type().clone(),
))
})
.collect::<Result<Vec<(String, DataType)>>>()?;
.map(|col| Ok(schema.field_with_name(col)?.clone()))
.collect::<Result<Vec<_>>>()?;

let mut output_ordering = vec![];
for node_collection in &proto.output_ordering {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
table_partition_cols: conf
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.map(|x| x.name().clone())
.collect::<Vec<_>>(),
object_store_url: conf.object_store_url.to_string(),
output_ordering: output_orderings
Expand Down