From cded193927d3cbd9b76bb71792d08102ccdeb788 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 11 Feb 2023 06:48:38 -0500 Subject: [PATCH 1/4] Port some explain test to sqllogictest, add filename normalization --- datafusion/core/tests/sql/explain_analyze.rs | 51 --------- datafusion/core/tests/sqllogictests/README.md | 2 +- .../sqllogictests/src/engines/conversion.rs | 2 +- .../src/engines/datafusion/normalize.rs | 101 +++++++++++++++++- .../sqllogictests/test_files/explain.slt | 81 ++++++++++++++ 5 files changed, 181 insertions(+), 56 deletions(-) create mode 100644 datafusion/core/tests/sqllogictests/test_files/explain.slt diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 5fc877a2c8e65..3bd5e3fd54f36 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -673,42 +673,6 @@ async fn test_physical_plan_display_indent_multi_children() { ); } -#[tokio::test] -#[cfg_attr(tarpaulin, ignore)] -async fn csv_explain() { - // This test uses the execute function that create full plan cycle: logical, optimized logical, and physical, - // then execute the physical plan and return the final explain results - let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(4096)); - register_aggregate_csv_by_sql(&ctx).await; - let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > cast(10 as int)"; - let actual = execute(&ctx, sql).await; - let actual = normalize_vec_for_explain(actual); - - // Note can't use `assert_batches_eq` as the plan needs to be - // normalized for filenames and number of cores - let expected = vec![ - vec![ - "logical_plan", - "Projection: aggregate_test_100.c1\ - \n Filter: aggregate_test_100.c2 > Int8(10)\ - \n TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]", - ], - vec!["physical_plan", - "ProjectionExec: expr=[c1@0 as c1]\ - \n CoalesceBatchesExec: target_batch_size=4096\ - \n FilterExec: c2@1 > 10\ - \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\ - \n CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2]\ - \n", - ]]; - assert_eq!(expected, actual); - - let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10"; - let actual = execute(&ctx, sql).await; - let actual = normalize_vec_for_explain(actual); - assert_eq!(expected, actual); -} - #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn csv_explain_analyze() { @@ -819,18 +783,3 @@ async fn explain_physical_plan_only() { ]]; assert_eq!(expected, actual); } - -#[tokio::test] -async fn explain_nested() { - async fn test_nested_explain(explain_phy_plan_flag: bool) { - let mut config = ConfigOptions::new(); - config.explain.physical_plan_only = explain_phy_plan_flag; - let ctx = SessionContext::with_config(config.into()); - let sql = "EXPLAIN explain select 1"; - let err = ctx.sql(sql).await.unwrap_err(); - assert!(err.to_string().contains("Explain must be root of the plan")); - } - - test_nested_explain(true).await; - test_nested_explain(false).await; -} diff --git a/datafusion/core/tests/sqllogictests/README.md b/datafusion/core/tests/sqllogictests/README.md index afd6f7d3c150f..264e5ea35d5c5 100644 --- a/datafusion/core/tests/sqllogictests/README.md +++ b/datafusion/core/tests/sqllogictests/README.md @@ -78,7 +78,7 @@ docker run \ In test script completion mode, `sqllogictests` reads a prototype script and runs the statements and queries against the database engine. The output is is a full script that is a copy of the prototype script with result inserted. -You can update tests by passing the `--complete` argument. +You can update the tests / generate expexcted output by passing the `--complete` argument. ```shell # Update ddl.slt with output from running diff --git a/datafusion/core/tests/sqllogictests/src/engines/conversion.rs b/datafusion/core/tests/sqllogictests/src/engines/conversion.rs index 9978e75264aed..0d013c47b3484 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/conversion.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/conversion.rs @@ -34,7 +34,7 @@ pub fn varchar_to_str(value: &str) -> String { if value.is_empty() { "(empty)".to_string() } else { - value.to_string() + value.trim_end_matches('\n').to_string() } } diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs index 601033e6ac3c8..07f86c64acf7f 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::path::PathBuf; + +use crate::output::{DFColumnType, DFOutput}; use arrow::{array, array::ArrayRef, datatypes::DataType, record_batch::RecordBatch}; use datafusion::error::DataFusionError; +use lazy_static::lazy_static; use sqllogictest::DBOutput; -use crate::output::{DFColumnType, DFOutput}; - use super::super::conversion::*; use super::error::{DFSqlLogicTestError, Result}; @@ -52,12 +54,105 @@ pub fn convert_batches(batches: Vec) -> Result { ), ))); } - rows.append(&mut convert_batch(batch)?); + + let new_rows = convert_batch(batch)? + .into_iter() + .flat_map(expand_row) + .map(normalize_paths); + rows.extend(new_rows); } Ok(DBOutput::Rows { types, rows }) } +/// special case rows that have newlines in them (like explain plans) +// +/// Transform inputs like: +/// ```text +/// [ +/// "logical_plan", +/// "Sort: d.b ASC NULLS LAST\n Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +/// +/// Into one cell per line, adding lines if necessary +/// ```text +/// [ +/// "logical_plan", +/// ] +/// [ +/// "Sort: d.b ASC NULLS LAST", +/// ] +/// [ <--- newly added row +/// " Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +fn expand_row(mut row: Vec) -> impl Iterator> { + use itertools::Either; + use std::iter::once; + + // check last cell + if let Some(cell) = row.pop() { + let lines: Vec<_> = cell.split('\n').collect(); + + // no newlines in last cell + if lines.is_empty() { + row.push(cell); + return Either::Left(once(row)); + } + + // form new rows with each additional line + let new_lines: Vec<_> = lines.into_iter().map(|l| vec![l.to_string()]).collect(); + + Either::Right(once(row).chain(new_lines.into_iter())) + } else { + Either::Left(once(row)) + } +} + +/// normalize path references +/// +/// ``` +/// CsvExec: files={1 group: [[path/to/datafusion/testing/data/csv/aggregate_test_100.csv]]}, ... +/// ``` +/// +/// into: +/// +/// ``` +/// CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, ... +/// ``` +fn normalize_paths(mut row: Vec) -> Vec { + row.iter_mut().for_each(|s| { + let workspace_root: &str = WORKSPACE_ROOT.as_ref(); + if s.contains(workspace_root) { + *s = s.replace(workspace_root, "WORKSPACE_ROOT"); + } + }); + row +} + +/// return the location of the datafusion checkout +fn workspace_root() -> object_store::path::Path { + // e.g. /Software/arrow-datafusion/datafusion/core + let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + // e.g. /Software/arrow-datafusion/datafusion + let workspace_root = dir + .parent() + .expect("Can not find parent of datafusion/core") + // e.g. /Software/arrow-datafusion + .parent() + .expect("parent of datafusion") + .to_string_lossy(); + + object_store::path::Path::parse(workspace_root).unwrap() +} + +// holds the root directory ( +lazy_static! { + static ref WORKSPACE_ROOT: object_store::path::Path = workspace_root(); +} + /// Convert a single batch to a `Vec>` for comparison fn convert_batch(batch: RecordBatch) -> Result>> { (0..batch.num_rows()) diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt new file mode 100644 index 0000000000000..9192a09471480 --- /dev/null +++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT NOT NULL, + c5 INTEGER NOT NULL, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL + ) +STORED AS CSV +WITH HEADER ROW +LOCATION '../../testing/data/csv/aggregate_test_100.csv'; + +query ?? +explain SELECT c1 FROM aggregate_test_100 where c2 > 10 +---- +logical_plan +Projection: aggregate_test_100.c1 + Filter: aggregate_test_100.c2 > Int8(10) + TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] +physical_plan +ProjectionExec: expr=[c1@0 as c1] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c2@1 > 10 + RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 + CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2] + + +## explain_physical_plan_only + +statement ok +set datafusion.explain.physical_plan_only = true + +query ?? +EXPLAIN select count(*) from (values ('a', 1, 100), ('a', 2, 150)) as t (c1,c2,c3) +---- +physical_plan +ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] + ProjectionExec: expr=[2 as COUNT(UInt8(1))] + EmptyExec: produce_one_row=true + +statement ok +set datafusion.explain.physical_plan_only = false + + +## explain nested +statement error Explain must be root of the plan +EXPLAIN explain select 1 + +statement ok +set datafusion.explain.physical_plan_only = true + +statement error Explain must be root of the plan +EXPLAIN explain select 1 + +statement ok +set datafusion.explain.physical_plan_only = false From d2a8496e3d0fc5e89eae421ffd9041bd2345cfc7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 11 Feb 2023 08:09:06 -0500 Subject: [PATCH 2/4] fix: newline --- .../tests/sqllogictests/src/engines/datafusion/normalize.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs index 07f86c64acf7f..3a03210633151 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs @@ -96,7 +96,7 @@ fn expand_row(mut row: Vec) -> impl Iterator> { let lines: Vec<_> = cell.split('\n').collect(); // no newlines in last cell - if lines.is_empty() { + if lines.len() < 2 { row.push(cell); return Either::Left(once(row)); } From 0f05d484b72df39c28d73b7aaa284ef4027eb223 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 11 Feb 2023 16:45:22 -0500 Subject: [PATCH 3/4] Update datafusion/core/tests/sqllogictests/README.md Co-authored-by: Yevhenii Melnyk --- datafusion/core/tests/sqllogictests/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/sqllogictests/README.md b/datafusion/core/tests/sqllogictests/README.md index 264e5ea35d5c5..72a6c69048b06 100644 --- a/datafusion/core/tests/sqllogictests/README.md +++ b/datafusion/core/tests/sqllogictests/README.md @@ -78,7 +78,7 @@ docker run \ In test script completion mode, `sqllogictests` reads a prototype script and runs the statements and queries against the database engine. The output is is a full script that is a copy of the prototype script with result inserted. -You can update the tests / generate expexcted output by passing the `--complete` argument. +You can update the tests / generate expected output by passing the `--complete` argument. ```shell # Update ddl.slt with output from running From 7a784c25968e2e17652eee0aa3f2a306f9be4988 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 12 Feb 2023 09:04:16 -0500 Subject: [PATCH 4/4] fix: include --- .../core/tests/sqllogictests/src/engines/datafusion/normalize.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs index bb384190fe6ca..2f0705bf147f3 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs @@ -18,6 +18,7 @@ use arrow::datatypes::SchemaRef; use arrow::{array, array::ArrayRef, datatypes::DataType, record_batch::RecordBatch}; use datafusion_common::DataFusionError; +use lazy_static::lazy_static; use sqllogictest::DBOutput; use std::path::PathBuf;