diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 82774a6e831c9..96b3adf968b85 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -177,7 +177,7 @@ impl ExecutionPlan for ArrowExec { let opener = ArrowOpener { object_store, - projection: self.base_config.projection.clone(), + projection: self.base_config.file_column_projection_indices(), }; let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?; diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 4a814c5b9b2c5..370ca91a0b0e9 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -157,6 +157,22 @@ impl FileScanConfig { }) } + /// Projects only file schema, ignoring partition columns + pub(crate) fn projected_file_schema(&self) -> SchemaRef { + let fields = self.file_column_projection_indices().map(|indices| { + indices + .iter() + .map(|col_idx| self.file_schema.field(*col_idx)) + .cloned() + .collect::>() + }); + + fields.map_or_else( + || Arc::clone(&self.file_schema), + |f| Arc::new(Schema::new(f).with_metadata(self.file_schema.metadata.clone())), + ) + } + pub(crate) fn file_column_projection_indices(&self) -> Option> { self.projection.as_ref().map(|p| { p.iter() @@ -686,6 +702,66 @@ mod tests { crate::assert_batches_eq!(expected, &[projected_batch]); } + #[test] + fn test_projected_file_schema_with_partition_col() { + let schema = aggr_test_schema(); + let partition_cols = vec![ + ( + "part1".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ( + "part2".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ]; + + // Projected file schema for config with projection including partition column + let projection = config_for_projection( + schema.clone(), + Some(vec![0, 3, 5, schema.fields().len()]), + Statistics::new_unknown(&schema), + to_partition_cols(partition_cols.clone()), + ) + .projected_file_schema(); + + // Assert partition column filtered out in projected file schema + let expected_columns = vec!["c1", "c4", "c6"]; + let actual_columns = projection + .fields() + .iter() + .map(|f| f.name().clone()) + .collect::>(); + assert_eq!(expected_columns, actual_columns); + } + + #[test] + fn test_projected_file_schema_without_projection() { + let schema = aggr_test_schema(); + let partition_cols = vec![ + ( + "part1".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ( + "part2".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ]; + + // Projected file schema for config without projection + let projection = config_for_projection( + schema.clone(), + None, + Statistics::new_unknown(&schema), + to_partition_cols(partition_cols.clone()), + ) + .projected_file_schema(); + + // Assert projected file schema is equal to file schema + assert_eq!(projection.fields(), schema.fields()); + } + // sets default for configs that play no role in projections fn config_for_projection( file_schema: SchemaRef, diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 62b96ea3aefbd..ca466b5c6a922 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -174,14 +174,13 @@ impl ExecutionPlan for NdJsonExec { context: Arc, ) -> Result { let batch_size = context.session_config().batch_size(); - let (projected_schema, ..) = self.base_config.project(); let object_store = context .runtime_env() .object_store(&self.base_config.object_store_url)?; let opener = JsonOpener { batch_size, - projected_schema, + projected_schema: self.base_config.projected_file_schema(), file_compression_type: self.file_compression_type.to_owned(), object_store, }; diff --git a/datafusion/core/tests/data/partitioned_table_arrow/part=123/data.arrow b/datafusion/core/tests/data/partitioned_table_arrow/part=123/data.arrow new file mode 100644 index 0000000000000..48151a2ed2408 Binary files /dev/null and b/datafusion/core/tests/data/partitioned_table_arrow/part=123/data.arrow differ diff --git a/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow b/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow new file mode 100644 index 0000000000000..be932c7f656a6 Binary files /dev/null and b/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow differ diff --git a/datafusion/core/tests/data/partitioned_table_json/part=1/data.json b/datafusion/core/tests/data/partitioned_table_json/part=1/data.json new file mode 100644 index 0000000000000..466c5b3dc4abc --- /dev/null +++ b/datafusion/core/tests/data/partitioned_table_json/part=1/data.json @@ -0,0 +1,2 @@ +{"id": 1, "value": "foo"} +{"id": 2, "value": "bar"} diff --git a/datafusion/core/tests/data/partitioned_table_json/part=2/data.json b/datafusion/core/tests/data/partitioned_table_json/part=2/data.json new file mode 100644 index 0000000000000..857d70e1f397d --- /dev/null +++ b/datafusion/core/tests/data/partitioned_table_json/part=2/data.json @@ -0,0 +1,2 @@ +{"id": 3, "value": "baz"} +{"id": 4, "value": "qux"} diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt b/datafusion/sqllogictest/test_files/arrow_files.slt index 5c1b6fb726ed7..8cf3550fdb25d 100644 --- a/datafusion/sqllogictest/test_files/arrow_files.slt +++ b/datafusion/sqllogictest/test_files/arrow_files.slt @@ -42,3 +42,74 @@ SELECT * FROM arrow_simple 2 bar NULL 3 baz false 4 NULL true + +# ARROW partitioned table +statement ok +CREATE EXTERNAL TABLE arrow_partitioned ( + part Int, + f0 Bigint, + f1 String, + f2 Boolean +) +STORED AS ARROW +LOCATION '../core/tests/data/partitioned_table_arrow/' +PARTITIONED BY (part); + +# select wildcard +query ITBI +SELECT * FROM arrow_partitioned ORDER BY f0; +---- +1 foo true 123 +2 bar false 123 +3 baz true 456 +4 NULL NULL 456 + +# select all fields +query IITB +SELECT part, f0, f1, f2 FROM arrow_partitioned ORDER BY f0; +---- +123 1 foo true +123 2 bar false +456 3 baz true +456 4 NULL NULL + +# select without partition column +query IB +SELECT f0, f2 FROM arrow_partitioned ORDER BY f0 +---- +1 true +2 false +3 true +4 NULL + +# select only partition column +query I +SELECT part FROM arrow_partitioned ORDER BY part +---- +123 +123 +456 +456 + +# select without any table-related columns in projection +query I +SELECT 1 FROM arrow_partitioned +---- +1 +1 +1 +1 + +# select with partition filter +query I +SELECT f0 FROM arrow_partitioned WHERE part = 123 ORDER BY f0 +---- +1 +2 + +# select with partition filter should scan only one directory +query TT +EXPLAIN SELECT f0 FROM arrow_partitioned WHERE part = 456 +---- +logical_plan TableScan: arrow_partitioned projection=[f0], full_filters=[arrow_partitioned.part = Int32(456)] +physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow]]}, projection=[f0] diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index c0f86ac76320a..0082f2ecefb96 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -99,9 +99,11 @@ select * from dictionary_encoded_arrow_test_readback; ---- b -# https://github.com/apache/arrow-datafusion/issues/7816 -query error DataFusion error: Arrow error: Schema error: project index 1 out of bounds, max field 1 +query TT select * from dictionary_encoded_arrow_partitioned order by (a); +---- +a foo +b bar # test_insert_into @@ -195,9 +197,15 @@ INSERT INTO partitioned_insert_test_json values (1, 2), (3, 4), (5, 6), (1, 2), ---- 6 -# Issue open for this error: https://github.com/apache/arrow-datafusion/issues/7816 -query error DataFusion error: Arrow error: Json error: Encountered unmasked nulls in non\-nullable StructArray child: Field \{ name: "a", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: \{\} \} +query TT select * from partitioned_insert_test_json order by a,b +---- +1 2 +1 2 +3 4 +3 4 +5 6 +5 6 statement ok CREATE EXTERNAL TABLE diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index c0d5e895f0f2e..24c97816fe7fd 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -68,3 +68,74 @@ DROP TABLE json_test statement ok DROP TABLE single_nan + +# JSON partitioned table +statement ok +CREATE EXTERNAL TABLE json_partitioned_test ( + part Int, + id Int, + value String, +) +STORED AS JSON +LOCATION '../core/tests/data/partitioned_table_json' +PARTITIONED BY (part); + +# select wildcard always returns partition columns as the last ones +query ITI +SELECT * FROM json_partitioned_test ORDER BY id +---- +1 foo 1 +2 bar 1 +3 baz 2 +4 qux 2 + + +# select all fields +query IIT +SELECT part, id, value FROM json_partitioned_test ORDER BY id +---- +1 1 foo +1 2 bar +2 3 baz +2 4 qux + +# select without partition column +query I +SELECT id FROM json_partitioned_test ORDER BY id +---- +1 +2 +3 +4 + +# select only partition column +query I +SELECT part FROM json_partitioned_test ORDER BY part +---- +1 +1 +2 +2 + +# select without any table-related columns in projection +query T +SELECT 'x' FROM json_partitioned_test +---- +x +x +x +x + +# select with partition filter +query I +SELECT id FROM json_partitioned_test WHERE part = 1 ORDER BY id +---- +1 +2 + +# select with partition filter should scan only one directory +query TT +EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2 +---- +logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)] +physical_plan JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id]