diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 5fc877a2c8e65..3bd5e3fd54f36 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -673,42 +673,6 @@ async fn test_physical_plan_display_indent_multi_children() { ); } -#[tokio::test] -#[cfg_attr(tarpaulin, ignore)] -async fn csv_explain() { - // This test uses the execute function that create full plan cycle: logical, optimized logical, and physical, - // then execute the physical plan and return the final explain results - let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(4096)); - register_aggregate_csv_by_sql(&ctx).await; - let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > cast(10 as int)"; - let actual = execute(&ctx, sql).await; - let actual = normalize_vec_for_explain(actual); - - // Note can't use `assert_batches_eq` as the plan needs to be - // normalized for filenames and number of cores - let expected = vec![ - vec![ - "logical_plan", - "Projection: aggregate_test_100.c1\ - \n Filter: aggregate_test_100.c2 > Int8(10)\ - \n TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]", - ], - vec!["physical_plan", - "ProjectionExec: expr=[c1@0 as c1]\ - \n CoalesceBatchesExec: target_batch_size=4096\ - \n FilterExec: c2@1 > 10\ - \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\ - \n CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2]\ - \n", - ]]; - assert_eq!(expected, actual); - - let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10"; - let actual = execute(&ctx, sql).await; - let actual = normalize_vec_for_explain(actual); - assert_eq!(expected, actual); -} - #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn csv_explain_analyze() { @@ -819,18 +783,3 @@ async fn explain_physical_plan_only() { ]]; assert_eq!(expected, actual); } - -#[tokio::test] -async fn explain_nested() { - async fn test_nested_explain(explain_phy_plan_flag: bool) { - let mut config = ConfigOptions::new(); - config.explain.physical_plan_only = explain_phy_plan_flag; - let ctx = SessionContext::with_config(config.into()); - let sql = "EXPLAIN explain select 1"; - let err = ctx.sql(sql).await.unwrap_err(); - assert!(err.to_string().contains("Explain must be root of the plan")); - } - - test_nested_explain(true).await; - test_nested_explain(false).await; -} diff --git a/datafusion/core/tests/sqllogictests/README.md b/datafusion/core/tests/sqllogictests/README.md index afd6f7d3c150f..72a6c69048b06 100644 --- a/datafusion/core/tests/sqllogictests/README.md +++ b/datafusion/core/tests/sqllogictests/README.md @@ -78,7 +78,7 @@ docker run \ In test script completion mode, `sqllogictests` reads a prototype script and runs the statements and queries against the database engine. The output is is a full script that is a copy of the prototype script with result inserted. -You can update tests by passing the `--complete` argument. +You can update the tests / generate expected output by passing the `--complete` argument. ```shell # Update ddl.slt with output from running diff --git a/datafusion/core/tests/sqllogictests/src/engines/conversion.rs b/datafusion/core/tests/sqllogictests/src/engines/conversion.rs index 9978e75264aed..0d013c47b3484 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/conversion.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/conversion.rs @@ -34,7 +34,7 @@ pub fn varchar_to_str(value: &str) -> String { if value.is_empty() { "(empty)".to_string() } else { - value.to_string() + value.trim_end_matches('\n').to_string() } } diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs index f9e4b631378f9..2f0705bf147f3 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs @@ -18,7 +18,9 @@ use arrow::datatypes::SchemaRef; use arrow::{array, array::ArrayRef, datatypes::DataType, record_batch::RecordBatch}; use datafusion_common::DataFusionError; +use lazy_static::lazy_static; use sqllogictest::DBOutput; +use std::path::PathBuf; use crate::output::{DFColumnType, DFOutput}; @@ -53,12 +55,105 @@ pub fn convert_batches(batches: Vec) -> Result { ), ))); } - rows.append(&mut convert_batch(batch)?); + + let new_rows = convert_batch(batch)? + .into_iter() + .flat_map(expand_row) + .map(normalize_paths); + rows.extend(new_rows); } Ok(DBOutput::Rows { types, rows }) } +/// special case rows that have newlines in them (like explain plans) +// +/// Transform inputs like: +/// ```text +/// [ +/// "logical_plan", +/// "Sort: d.b ASC NULLS LAST\n Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +/// +/// Into one cell per line, adding lines if necessary +/// ```text +/// [ +/// "logical_plan", +/// ] +/// [ +/// "Sort: d.b ASC NULLS LAST", +/// ] +/// [ <--- newly added row +/// " Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +fn expand_row(mut row: Vec) -> impl Iterator> { + use itertools::Either; + use std::iter::once; + + // check last cell + if let Some(cell) = row.pop() { + let lines: Vec<_> = cell.split('\n').collect(); + + // no newlines in last cell + if lines.len() < 2 { + row.push(cell); + return Either::Left(once(row)); + } + + // form new rows with each additional line + let new_lines: Vec<_> = lines.into_iter().map(|l| vec![l.to_string()]).collect(); + + Either::Right(once(row).chain(new_lines.into_iter())) + } else { + Either::Left(once(row)) + } +} + +/// normalize path references +/// +/// ``` +/// CsvExec: files={1 group: [[path/to/datafusion/testing/data/csv/aggregate_test_100.csv]]}, ... +/// ``` +/// +/// into: +/// +/// ``` +/// CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, ... +/// ``` +fn normalize_paths(mut row: Vec) -> Vec { + row.iter_mut().for_each(|s| { + let workspace_root: &str = WORKSPACE_ROOT.as_ref(); + if s.contains(workspace_root) { + *s = s.replace(workspace_root, "WORKSPACE_ROOT"); + } + }); + row +} + +/// return the location of the datafusion checkout +fn workspace_root() -> object_store::path::Path { + // e.g. /Software/arrow-datafusion/datafusion/core + let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + // e.g. /Software/arrow-datafusion/datafusion + let workspace_root = dir + .parent() + .expect("Can not find parent of datafusion/core") + // e.g. /Software/arrow-datafusion + .parent() + .expect("parent of datafusion") + .to_string_lossy(); + + object_store::path::Path::parse(workspace_root).unwrap() +} + +// holds the root directory ( +lazy_static! { + static ref WORKSPACE_ROOT: object_store::path::Path = workspace_root(); +} + /// Check two schemas for being equal for field names/types fn equivalent_names_and_types(schema: &SchemaRef, other: SchemaRef) -> bool { if schema.fields().len() != other.fields().len() { diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt new file mode 100644 index 0000000000000..9192a09471480 --- /dev/null +++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT NOT NULL, + c5 INTEGER NOT NULL, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL + ) +STORED AS CSV +WITH HEADER ROW +LOCATION '../../testing/data/csv/aggregate_test_100.csv'; + +query ?? +explain SELECT c1 FROM aggregate_test_100 where c2 > 10 +---- +logical_plan +Projection: aggregate_test_100.c1 + Filter: aggregate_test_100.c2 > Int8(10) + TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] +physical_plan +ProjectionExec: expr=[c1@0 as c1] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c2@1 > 10 + RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 + CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2] + + +## explain_physical_plan_only + +statement ok +set datafusion.explain.physical_plan_only = true + +query ?? +EXPLAIN select count(*) from (values ('a', 1, 100), ('a', 2, 150)) as t (c1,c2,c3) +---- +physical_plan +ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] + ProjectionExec: expr=[2 as COUNT(UInt8(1))] + EmptyExec: produce_one_row=true + +statement ok +set datafusion.explain.physical_plan_only = false + + +## explain nested +statement error Explain must be root of the plan +EXPLAIN explain select 1 + +statement ok +set datafusion.explain.physical_plan_only = true + +statement error Explain must be root of the plan +EXPLAIN explain select 1 + +statement ok +set datafusion.explain.physical_plan_only = false