Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl ExecutionPlan for ArrowExec {

let opener = ArrowOpener {
object_store,
projection: self.base_config.projection.clone(),
projection: self.base_config.file_column_projection_indices(),
};
let stream =
FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
Expand Down
76 changes: 76 additions & 0 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,22 @@ impl FileScanConfig {
})
}

/// Projects only file schema, ignoring partition columns
pub(crate) fn projected_file_schema(&self) -> SchemaRef {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method looks good, but it would be nice if we could leverage file_column_projection_indices (which CsvOpener uses) so we aren't duplicating the logic to exclude the partition columns.

I think in general we could make Csv, Json, and Arrow file opening / configuring more consistent. We can cut follow on tickets for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed -- now this method returns schema with fields collected from iteration over file_column_projection_indices result

let fields = self.file_column_projection_indices().map(|indices| {
indices
.iter()
.map(|col_idx| self.file_schema.field(*col_idx))
.cloned()
.collect::<Vec<_>>()
});

fields.map_or_else(
|| Arc::clone(&self.file_schema),
|f| Arc::new(Schema::new(f).with_metadata(self.file_schema.metadata.clone())),
)
}

pub(crate) fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
self.projection.as_ref().map(|p| {
p.iter()
Expand Down Expand Up @@ -686,6 +702,66 @@ mod tests {
crate::assert_batches_eq!(expected, &[projected_batch]);
}

#[test]
fn test_projected_file_schema_with_partition_col() {
let schema = aggr_test_schema();
let partition_cols = vec![
(
"part1".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
(
"part2".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
];

// Projected file schema for config with projection including partition column
let projection = config_for_projection(
schema.clone(),
Some(vec![0, 3, 5, schema.fields().len()]),
Statistics::new_unknown(&schema),
to_partition_cols(partition_cols.clone()),
)
.projected_file_schema();

// Assert partition column filtered out in projected file schema
let expected_columns = vec!["c1", "c4", "c6"];
let actual_columns = projection
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<Vec<_>>();
assert_eq!(expected_columns, actual_columns);
}

#[test]
fn test_projected_file_schema_without_projection() {
let schema = aggr_test_schema();
let partition_cols = vec![
(
"part1".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
(
"part2".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
];

// Projected file schema for config without projection
let projection = config_for_projection(
schema.clone(),
None,
Statistics::new_unknown(&schema),
to_partition_cols(partition_cols.clone()),
)
.projected_file_schema();

// Assert projected file schema is equal to file schema
assert_eq!(projection.fields(), schema.fields());
}

// sets default for configs that play no role in projections
fn config_for_projection(
file_schema: SchemaRef,
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,13 @@ impl ExecutionPlan for NdJsonExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let batch_size = context.session_config().batch_size();
let (projected_schema, ..) = self.base_config.project();

let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;
let opener = JsonOpener {
batch_size,
projected_schema,
projected_schema: self.base_config.projected_file_schema(),
file_compression_type: self.file_compression_type.to_owned(),
object_store,
};
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id": 1, "value": "foo"}
{"id": 2, "value": "bar"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id": 3, "value": "baz"}
{"id": 4, "value": "qux"}
71 changes: 71 additions & 0 deletions datafusion/sqllogictest/test_files/arrow_files.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,74 @@ SELECT * FROM arrow_simple
2 bar NULL
3 baz false
4 NULL true

# ARROW partitioned table
statement ok
CREATE EXTERNAL TABLE arrow_partitioned (
part Int,
f0 Bigint,
f1 String,
f2 Boolean
)
STORED AS ARROW
LOCATION '../core/tests/data/partitioned_table_arrow/'
PARTITIONED BY (part);

# select wildcard
query ITBI
SELECT * FROM arrow_partitioned ORDER BY f0;
----
1 foo true 123
2 bar false 123
3 baz true 456
4 NULL NULL 456

# select all fields
query IITB
SELECT part, f0, f1, f2 FROM arrow_partitioned ORDER BY f0;
----
123 1 foo true
123 2 bar false
456 3 baz true
456 4 NULL NULL

# select without partition column
query IB
SELECT f0, f2 FROM arrow_partitioned ORDER BY f0
----
1 true
2 false
3 true
4 NULL

# select only partition column
query I
SELECT part FROM arrow_partitioned ORDER BY part
----
123
123
456
456

# select without any table-related columns in projection
query I
SELECT 1 FROM arrow_partitioned
----
1
1
1
1

# select with partition filter
query I
SELECT f0 FROM arrow_partitioned WHERE part = 123 ORDER BY f0
----
1
2

# select with partition filter should scan only one directory
query TT
EXPLAIN SELECT f0 FROM arrow_partitioned WHERE part = 456
----
logical_plan TableScan: arrow_partitioned projection=[f0], full_filters=[arrow_partitioned.part = Int32(456)]
physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow]]}, projection=[f0]
16 changes: 12 additions & 4 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ select * from dictionary_encoded_arrow_test_readback;
----
b

# https://github.com/apache/arrow-datafusion/issues/7816
query error DataFusion error: Arrow error: Schema error: project index 1 out of bounds, max field 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice! Glad to see the fix was essentially the same for arrow files.

query TT
select * from dictionary_encoded_arrow_partitioned order by (a);
----
a foo
b bar


# test_insert_into
Expand Down Expand Up @@ -195,9 +197,15 @@ INSERT INTO partitioned_insert_test_json values (1, 2), (3, 4), (5, 6), (1, 2),
----
6

# Issue open for this error: https://github.com/apache/arrow-datafusion/issues/7816
query error DataFusion error: Arrow error: Json error: Encountered unmasked nulls in non\-nullable StructArray child: Field \{ name: "a", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}
query TT
select * from partitioned_insert_test_json order by a,b
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

----
1 2
1 2
3 4
3 4
5 6
5 6

statement ok
CREATE EXTERNAL TABLE
Expand Down
71 changes: 71 additions & 0 deletions datafusion/sqllogictest/test_files/json.slt
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,74 @@ DROP TABLE json_test

statement ok
DROP TABLE single_nan

# JSON partitioned table
statement ok
CREATE EXTERNAL TABLE json_partitioned_test (
part Int,
id Int,
value String,
)
STORED AS JSON
LOCATION '../core/tests/data/partitioned_table_json'
PARTITIONED BY (part);

# select wildcard always returns partition columns as the last ones
query ITI
SELECT * FROM json_partitioned_test ORDER BY id
----
1 foo 1
2 bar 1
3 baz 2
4 qux 2


# select all fields
query IIT
SELECT part, id, value FROM json_partitioned_test ORDER BY id
----
1 1 foo
1 2 bar
2 3 baz
2 4 qux

# select without partition column
query I
SELECT id FROM json_partitioned_test ORDER BY id
----
1
2
3
4

# select only partition column
query I
SELECT part FROM json_partitioned_test ORDER BY part
----
1
1
2
2

# select without any table-related columns in projection
query T
SELECT 'x' FROM json_partitioned_test
----
x
x
x
x

# select with partition filter
query I
SELECT id FROM json_partitioned_test WHERE part = 1 ORDER BY id
----
1
2

# select with partition filter should scan only one directory
query TT
EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2
----
logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)]
physical_plan JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id]