From 384b27778706b8e8b0e45344646d4eed397ec714 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sat, 2 Mar 2024 15:27:18 +0200 Subject: [PATCH 1/4] fix partitioned table reading for json --- .../physical_plan/file_scan_config.rs | 76 +++++++++++++++++++ .../core/src/datasource/physical_plan/json.rs | 3 +- .../partitioned_table_json/part=1/data.json | 2 + .../partitioned_table_json/part=2/data.json | 2 + .../test_files/insert_to_external.slt | 10 ++- datafusion/sqllogictest/test_files/json.slt | 71 +++++++++++++++++ 6 files changed, 160 insertions(+), 4 deletions(-) create mode 100644 datafusion/core/tests/data/partitioned_table_json/part=1/data.json create mode 100644 datafusion/core/tests/data/partitioned_table_json/part=2/data.json diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 4a814c5b9b2c5..2f235a93fbc41 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -157,6 +157,22 @@ impl FileScanConfig { }) } + /// Projects only file schema, ignoring partition columns + pub(crate) fn projected_file_schema(&self) -> SchemaRef { + let fields = self.projection.as_ref().map(|proj| { + proj.iter() + .filter(|col_idx| **col_idx < self.file_schema.fields().len()) + .map(|col_idx| self.file_schema.field(*col_idx)) + .cloned() + .collect::>() + }); + + fields.map_or_else( + || Arc::clone(&self.file_schema), + |f| Arc::new(Schema::new(f).with_metadata(self.file_schema.metadata.clone())), + ) + } + pub(crate) fn file_column_projection_indices(&self) -> Option> { self.projection.as_ref().map(|p| { p.iter() @@ -686,6 +702,66 @@ mod tests { crate::assert_batches_eq!(expected, &[projected_batch]); } + #[test] + fn test_projected_file_schema_with_partition_col() { + let schema = aggr_test_schema(); + let partition_cols = vec![ + ( + "part1".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ( + "part2".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ]; + + // Projected file schema for config with projection including partition column + let projection = config_for_projection( + schema.clone(), + Some(vec![0, 3, 5, schema.fields().len()]), + Statistics::new_unknown(&schema), + to_partition_cols(partition_cols.clone()), + ) + .projected_file_schema(); + + // Assert partition column filtered out in projected file schema + let expected_columns = vec!["c1", "c4", "c6"]; + let actual_columns = projection + .fields() + .iter() + .map(|f| f.name().clone()) + .collect::>(); + assert_eq!(expected_columns, actual_columns); + } + + #[test] + fn test_projected_file_schema_without_projection() { + let schema = aggr_test_schema(); + let partition_cols = vec![ + ( + "part1".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ( + "part2".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ]; + + // Projected file schema for config without projection + let projection = config_for_projection( + schema.clone(), + None, + Statistics::new_unknown(&schema), + to_partition_cols(partition_cols.clone()), + ) + .projected_file_schema(); + + // Assert projected file schema is equal to file schema + assert_eq!(projection.fields(), schema.fields()); + } + // sets default for configs that play no role in projections fn config_for_projection( file_schema: SchemaRef, diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 62b96ea3aefbd..ca466b5c6a922 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -174,14 +174,13 @@ impl ExecutionPlan for NdJsonExec { context: Arc, ) -> Result { let batch_size = context.session_config().batch_size(); - let (projected_schema, ..) = self.base_config.project(); let object_store = context .runtime_env() .object_store(&self.base_config.object_store_url)?; let opener = JsonOpener { batch_size, - projected_schema, + projected_schema: self.base_config.projected_file_schema(), file_compression_type: self.file_compression_type.to_owned(), object_store, }; diff --git a/datafusion/core/tests/data/partitioned_table_json/part=1/data.json b/datafusion/core/tests/data/partitioned_table_json/part=1/data.json new file mode 100644 index 0000000000000..466c5b3dc4abc --- /dev/null +++ b/datafusion/core/tests/data/partitioned_table_json/part=1/data.json @@ -0,0 +1,2 @@ +{"id": 1, "value": "foo"} +{"id": 2, "value": "bar"} diff --git a/datafusion/core/tests/data/partitioned_table_json/part=2/data.json b/datafusion/core/tests/data/partitioned_table_json/part=2/data.json new file mode 100644 index 0000000000000..857d70e1f397d --- /dev/null +++ b/datafusion/core/tests/data/partitioned_table_json/part=2/data.json @@ -0,0 +1,2 @@ +{"id": 3, "value": "baz"} +{"id": 4, "value": "qux"} diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index c0f86ac76320a..2e10862bac667 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -195,9 +195,15 @@ INSERT INTO partitioned_insert_test_json values (1, 2), (3, 4), (5, 6), (1, 2), ---- 6 -# Issue open for this error: https://github.com/apache/arrow-datafusion/issues/7816 -query error DataFusion error: Arrow error: Json error: Encountered unmasked nulls in non\-nullable StructArray child: Field \{ name: "a", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: \{\} \} +query TT select * from partitioned_insert_test_json order by a,b +---- +1 2 +1 2 +3 4 +3 4 +5 6 +5 6 statement ok CREATE EXTERNAL TABLE diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index c0d5e895f0f2e..01b330f24c53f 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -68,3 +68,74 @@ DROP TABLE json_test statement ok DROP TABLE single_nan + +# JSON partitioned table +statement ok +CREATE EXTERNAL TABLE json_partitioned_test ( + part Int, + id Int, + value String, +) +STORED AS JSON +LOCATION '../core/tests/data/partitioned_table_json' +PARTITIONED BY (part); + +# select wildcard always returns partition columns as the last ones +query ITI +SELECT * FROM json_partitioned_test ORDER BY id +---- +1 foo 1 +2 bar 1 +3 baz 2 +4 qux 2 + + +# select all fields +query IIT +SELECT part, id, value FROM json_partitioned_test ORDER BY id +---- +1 1 foo +1 2 bar +2 3 baz +2 4 qux + +# select without partition column +query I +SELECT id FROM json_partitioned_test ORDER BY id +---- +1 +2 +3 +4 + +# select only partition column +query I +SELECT part FROM json_partitioned_test ORDER BY part +---- +1 +1 +2 +2 + +# select without any table-relates columns in projection +query T +SELECT 'x' FROM json_partitioned_test +---- +x +x +x +x + +# select with partition filter +query I +SELECT id FROM json_partitioned_test WHERE part = 1 ORDER BY id +---- +1 +2 + +# select with partition filter should scan only one directory +query TT +EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2 +---- +logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)] +physical_plan JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id] From d12a355a6a91ae01fa6e84f7cd7f7bdba5925acf Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sat, 2 Mar 2024 19:02:34 +0200 Subject: [PATCH 2/4] wildcard projection test for csv partitioned table --- datafusion/sqllogictest/test_files/ddl.slt | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 682972b5572a9..09ba75be8cbb2 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -490,20 +490,28 @@ DROP TABLE csv_with_timestamps statement ok CREATE EXTERNAL TABLE csv_with_timestamps ( - name VARCHAR, - ts TIMESTAMP, c_date DATE, + name VARCHAR, + ts TIMESTAMP ) STORED AS CSV PARTITIONED BY (c_date) LOCATION '../core/tests/data/partitioned_table'; +# select wildcard always returns partition columns as the last ones query TPD SELECT * from csv_with_timestamps where c_date='2018-11-13' ---- Jorge 2018-12-13T12:12:10.011 2018-11-13 Andrew 2018-11-13T17:11:10.011 2018-11-13 +# select all fields explicitly +query DTP +SELECT c_date, name, ts from csv_with_timestamps where c_date='2018-11-13' +---- +2018-11-13 Jorge 2018-12-13T12:12:10.011 +2018-11-13 Andrew 2018-11-13T17:11:10.011 + statement ok DROP TABLE csv_with_timestamps From 7be4bf858d9680ad42fb457b05a1ec06226bb891 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sun, 3 Mar 2024 16:43:07 +0200 Subject: [PATCH 3/4] review comments --- .../src/datasource/physical_plan/file_scan_config.rs | 6 +++--- datafusion/sqllogictest/test_files/ddl.slt | 12 ++---------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 2f235a93fbc41..370ca91a0b0e9 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -159,9 +159,9 @@ impl FileScanConfig { /// Projects only file schema, ignoring partition columns pub(crate) fn projected_file_schema(&self) -> SchemaRef { - let fields = self.projection.as_ref().map(|proj| { - proj.iter() - .filter(|col_idx| **col_idx < self.file_schema.fields().len()) + let fields = self.file_column_projection_indices().map(|indices| { + indices + .iter() .map(|col_idx| self.file_schema.field(*col_idx)) .cloned() .collect::>() diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 09ba75be8cbb2..682972b5572a9 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -490,28 +490,20 @@ DROP TABLE csv_with_timestamps statement ok CREATE EXTERNAL TABLE csv_with_timestamps ( - c_date DATE, name VARCHAR, - ts TIMESTAMP + ts TIMESTAMP, + c_date DATE, ) STORED AS CSV PARTITIONED BY (c_date) LOCATION '../core/tests/data/partitioned_table'; -# select wildcard always returns partition columns as the last ones query TPD SELECT * from csv_with_timestamps where c_date='2018-11-13' ---- Jorge 2018-12-13T12:12:10.011 2018-11-13 Andrew 2018-11-13T17:11:10.011 2018-11-13 -# select all fields explicitly -query DTP -SELECT c_date, name, ts from csv_with_timestamps where c_date='2018-11-13' ----- -2018-11-13 Jorge 2018-12-13T12:12:10.011 -2018-11-13 Andrew 2018-11-13T17:11:10.011 - statement ok DROP TABLE csv_with_timestamps From 33814f9a812084753dd0a8412ca7346751c50c91 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sun, 3 Mar 2024 18:28:02 +0200 Subject: [PATCH 4/4] fix partitioned arrow tables reading --- .../datasource/physical_plan/arrow_file.rs | 2 +- .../part=123/data.arrow | Bin 0 -> 2258 bytes .../part=456/data.arrow | Bin 0 -> 2306 bytes .../sqllogictest/test_files/arrow_files.slt | 71 ++++++++++++++++++ .../test_files/insert_to_external.slt | 6 +- datafusion/sqllogictest/test_files/json.slt | 2 +- 6 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 datafusion/core/tests/data/partitioned_table_arrow/part=123/data.arrow create mode 100644 datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 82774a6e831c9..96b3adf968b85 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -177,7 +177,7 @@ impl ExecutionPlan for ArrowExec { let opener = ArrowOpener { object_store, - projection: self.base_config.projection.clone(), + projection: self.base_config.file_column_projection_indices(), }; let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?; diff --git a/datafusion/core/tests/data/partitioned_table_arrow/part=123/data.arrow b/datafusion/core/tests/data/partitioned_table_arrow/part=123/data.arrow new file mode 100644 index 0000000000000000000000000000000000000000..48151a2ed2408b872b9929dd76a4e47cc8895020 GIT binary patch literal 2258 zcmeHJL2DC16rQwc8cHZR^w2|vG9q}0AqgrX9@>jHQHtV0Bu z!J|iifIr5A|3$xVW;SHE0TDdrmC4Ne-kbN{zIpG_^NWk;ue(D023_q6(G;WuaVTnH zPtjt0j77YTU&`wC>}=I=j#V^LsSqjj=Q4;!`l>%P;dBzE^6}}L zx%>zWCz+_|Qp?ts#a6n=M)y2Ra<} zi^qa2N~{mCTsmc&3uXe5S2WI!OZ)5$!%`ciJ6^97ykYP1tMq z+FiL2C~NT^tA)jS{efCX)z(<+M;^l_P@ZDk6XF--Kb_7Vb&j!kk7RYwFID@;8b3n4 z*TlZb8}>UMyIYdw{B4iF@cS*l$0hxoXU=kB{@3Fn{x`nqFIVNW z<9BCY4#CO_Ce5;Rk#mRS^}sI4(j`aFigL#5fnAP`8K}f-aE6h*9PH9UC4bZ>&(KY@%n;yd%gH9dox-r)LsImv;&yPcui8M>XJoJ{{`hJHieo!kFq{s5qe B(yRag literal 0 HcmV?d00001 diff --git a/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow b/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow new file mode 100644 index 0000000000000000000000000000000000000000..be932c7f656a63a3def6ad9849cb68095028e17d GIT binary patch literal 2306 zcmeHJv2GJV5M9R?$BL}TMT!Weh!&72jAdI8MO03c4k1M#3WUtsK8J(zopnA#9LthL zl~16|2T)SvQz-bC@ZRp;ad&ZpMCq~icIM6O%I6E?n6-l7Sk#0D_tCLz>1aiXpZ%A5Oq_H0zRWFW*Bj{(9QEYu)C(j6ByL z_VSp?tQXHG3@2xSHwcF&gV}g8PZeSRlRJo++1QQMP{j)73+V4{us zlF4TwM=h?;Wnr;G7G4k^Khssn<&eUe6j-2T1DV)u$x(Yv5RDar{M%OV$LXwzqp?Dq zei-`q_p-L0I*2~Y$W?I|=@R>1KT^@06`ssh6op^u?TZ^ty)Yo!e%@>~TM|`}HgrbW zX?B`zxx^jB)fOKy8W_B@KXKo2b1S^>`xe6_a7D$aBgAj$e;J!?+8khT9ZB1uXB_?G zJ>SQjuZSJp1$Emir7f;I#K^5{nLeq)ntcK)b@4%{yDA!qxsYmzXgaW_y^6ABkh@g4JT32r&Bj)C>(JD{1FDXK_AyPfx2~g3+ylL>UEp< YcJtnD-dpwzfB)yby+Pld>i<4}01u_uYybcN literal 0 HcmV?d00001 diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt b/datafusion/sqllogictest/test_files/arrow_files.slt index 5c1b6fb726ed7..8cf3550fdb25d 100644 --- a/datafusion/sqllogictest/test_files/arrow_files.slt +++ b/datafusion/sqllogictest/test_files/arrow_files.slt @@ -42,3 +42,74 @@ SELECT * FROM arrow_simple 2 bar NULL 3 baz false 4 NULL true + +# ARROW partitioned table +statement ok +CREATE EXTERNAL TABLE arrow_partitioned ( + part Int, + f0 Bigint, + f1 String, + f2 Boolean +) +STORED AS ARROW +LOCATION '../core/tests/data/partitioned_table_arrow/' +PARTITIONED BY (part); + +# select wildcard +query ITBI +SELECT * FROM arrow_partitioned ORDER BY f0; +---- +1 foo true 123 +2 bar false 123 +3 baz true 456 +4 NULL NULL 456 + +# select all fields +query IITB +SELECT part, f0, f1, f2 FROM arrow_partitioned ORDER BY f0; +---- +123 1 foo true +123 2 bar false +456 3 baz true +456 4 NULL NULL + +# select without partition column +query IB +SELECT f0, f2 FROM arrow_partitioned ORDER BY f0 +---- +1 true +2 false +3 true +4 NULL + +# select only partition column +query I +SELECT part FROM arrow_partitioned ORDER BY part +---- +123 +123 +456 +456 + +# select without any table-related columns in projection +query I +SELECT 1 FROM arrow_partitioned +---- +1 +1 +1 +1 + +# select with partition filter +query I +SELECT f0 FROM arrow_partitioned WHERE part = 123 ORDER BY f0 +---- +1 +2 + +# select with partition filter should scan only one directory +query TT +EXPLAIN SELECT f0 FROM arrow_partitioned WHERE part = 456 +---- +logical_plan TableScan: arrow_partitioned projection=[f0], full_filters=[arrow_partitioned.part = Int32(456)] +physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow]]}, projection=[f0] diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 2e10862bac667..0082f2ecefb96 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -99,9 +99,11 @@ select * from dictionary_encoded_arrow_test_readback; ---- b -# https://github.com/apache/arrow-datafusion/issues/7816 -query error DataFusion error: Arrow error: Schema error: project index 1 out of bounds, max field 1 +query TT select * from dictionary_encoded_arrow_partitioned order by (a); +---- +a foo +b bar # test_insert_into diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index 01b330f24c53f..24c97816fe7fd 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -117,7 +117,7 @@ SELECT part FROM json_partitioned_test ORDER BY part 2 2 -# select without any table-relates columns in projection +# select without any table-related columns in projection query T SELECT 'x' FROM json_partitioned_test ----