From f899b05407ef9585717ba384d2d63455f0c944e4 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 19 Nov 2021 16:15:12 +0100 Subject: [PATCH] [fix] moved aggr_test_schema to test_utils This avoids that its duplicated in the benchmark package. --- datafusion/src/execution/dataframe_impl.rs | 6 +-- .../src/physical_plan/coalesce_partitions.rs | 3 +- datafusion/src/physical_plan/empty.rs | 11 +++-- .../src/physical_plan/file_format/csv.rs | 2 +- .../src/physical_plan/file_format/mod.rs | 5 ++- datafusion/src/physical_plan/filter.rs | 5 ++- datafusion/src/physical_plan/limit.rs | 4 +- datafusion/src/physical_plan/projection.rs | 3 +- datafusion/src/physical_plan/sort.rs | 3 +- .../physical_plan/sort_preserving_merge.rs | 12 +++--- datafusion/src/physical_plan/union.rs | 4 +- datafusion/src/physical_plan/values.rs | 4 +- datafusion/src/physical_plan/windows/mod.rs | 5 ++- datafusion/src/test/mod.rs | 21 +--------- datafusion/src/test_util.rs | 25 +++++++++++- datafusion/tests/common.rs | 40 ------------------- datafusion/tests/path_partition.rs | 6 +-- datafusion/tests/sql.rs | 5 +-- 18 files changed, 64 insertions(+), 100 deletions(-) delete mode 100644 datafusion/tests/common.rs diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index f565f5c3507a6..2887e29ada7ed 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -257,11 +257,11 @@ mod tests { use super::*; use crate::execution::options::CsvReadOptions; - use crate::logical_plan::*; + use crate::physical_plan::functions::ScalarFunctionImplementation; use crate::physical_plan::functions::Volatility; use crate::physical_plan::{window_functions, ColumnarValue}; use crate::{assert_batches_sorted_eq, execution::context::ExecutionContext}; - use crate::{physical_plan::functions::ScalarFunctionImplementation, test}; + use crate::{logical_plan::*, test_util}; use arrow::datatypes::DataType; #[tokio::test] @@ -510,7 +510,7 @@ mod tests { ctx: &mut ExecutionContext, table_name: &str, ) -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let testdata = crate::test_util::arrow_test_data(); ctx.register_csv( table_name, diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs index 9c133def82097..089c6b4617aa8 100644 --- a/datafusion/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/src/physical_plan/coalesce_partitions.rs @@ -211,10 +211,11 @@ mod tests { use crate::physical_plan::{collect, common}; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; + use crate::test_util; #[tokio::test] async fn merge() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let num_partitions = 4; let (_, files) = diff --git a/datafusion/src/physical_plan/empty.rs b/datafusion/src/physical_plan/empty.rs index 430beaf592e20..46b50020fe0d3 100644 --- a/datafusion/src/physical_plan/empty.rs +++ b/datafusion/src/physical_plan/empty.rs @@ -148,12 +148,11 @@ impl ExecutionPlan for EmptyExec { #[cfg(test)] mod tests { use super::*; - use crate::physical_plan::common; - use crate::test; + use crate::{physical_plan::common, test_util}; #[tokio::test] async fn empty() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let empty = EmptyExec::new(false, schema.clone()); assert_eq!(empty.schema(), schema); @@ -168,7 +167,7 @@ mod tests { #[test] fn with_new_children() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let empty = EmptyExec::new(false, schema); let empty2 = empty.with_new_children(vec![])?; @@ -184,7 +183,7 @@ mod tests { #[tokio::test] async fn invalid_execute() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let empty = EmptyExec::new(false, schema); // ask for the wrong partition @@ -195,7 +194,7 @@ mod tests { #[tokio::test] async fn produce_one_row() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let empty = EmptyExec::new(true, schema); let iter = empty.execute(0).await?; diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 0057e9e811abb..efea300bc8eef 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -167,7 +167,7 @@ mod tests { use crate::{ datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem}, scalar::ScalarValue, - test::aggr_test_schema, + test_util::aggr_test_schema, }; use futures::StreamExt; diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 3ad8ffefe41d5..17ec9f13424d2 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -277,8 +277,9 @@ fn create_dict_array( #[cfg(test)] mod tests { - use crate::test::{ - aggr_test_schema, build_table_i32, columns, object_store::TestObjectStore, + use crate::{ + test::{build_table_i32, columns, object_store::TestObjectStore}, + test_util::aggr_test_schema, }; use super::*; diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index fe0f10313451f..a32371a1e4810 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -227,13 +227,14 @@ mod tests { use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; use crate::physical_plan::ExecutionPlan; use crate::scalar::ScalarValue; - use crate::test::{self}; + use crate::test; + use crate::test_util; use crate::{logical_plan::Operator, physical_plan::collect}; use std::iter::Iterator; #[tokio::test] async fn simple_predicate() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let partitions = 4; let (_, files) = diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index f9c392a9056a0..ef492ec18320f 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -388,11 +388,11 @@ mod tests { use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::common; use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; - use crate::test; + use crate::{test, test_util}; #[tokio::test] async fn limit() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let num_partitions = 4; let (_, files) = diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index eb335c2100acf..eb0c4b8e047b1 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -264,11 +264,12 @@ mod tests { use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; use crate::scalar::ScalarValue; use crate::test::{self}; + use crate::test_util; use futures::future; #[tokio::test] async fn project_first_column() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let partitions = 4; let (_, files) = diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index a606906e86806..5eb29bbd01f9f 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -321,13 +321,14 @@ mod tests { use crate::test::assert_is_pending; use crate::test::exec::assert_strong_count_converges_to_zero; use crate::test::{self, exec::BlockingExec}; + use crate::test_util; use arrow::array::*; use arrow::datatypes::*; use futures::FutureExt; #[tokio::test] async fn test_sort() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let partitions = 4; let (_, files) = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 62f4b941f7f7d..3f4827ba6a0ae 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -664,7 +664,6 @@ mod tests { use std::iter::FromIterator; use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; - use crate::assert_batches_eq; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; @@ -672,6 +671,7 @@ mod tests { use crate::physical_plan::sort::SortExec; use crate::physical_plan::{collect, common}; use crate::test::{self, assert_is_pending}; + use crate::{assert_batches_eq, test_util}; use super::*; use arrow::datatypes::{DataType, Field, Schema}; @@ -930,7 +930,7 @@ mod tests { #[tokio::test] async fn test_partition_sort() { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let partitions = 4; let (_, files) = test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap(); @@ -1013,7 +1013,7 @@ mod tests { sort: Vec, sizes: &[usize], ) -> Arc { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let partitions = 4; let (_, files) = test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap(); @@ -1041,7 +1041,7 @@ mod tests { #[tokio::test] async fn test_partition_sort_streaming_input() { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let sort = vec![ // uint8 PhysicalSortExpr { @@ -1080,7 +1080,7 @@ mod tests { #[tokio::test] async fn test_partition_sort_streaming_input_output() { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let sort = vec![ // float64 @@ -1195,7 +1195,7 @@ mod tests { #[tokio::test] async fn test_async() { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let sort = vec![PhysicalSortExpr { expr: col("c12", &schema).unwrap(), options: SortOptions::default(), diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs index 418be630bed99..79c50720496d8 100644 --- a/datafusion/src/physical_plan/union.rs +++ b/datafusion/src/physical_plan/union.rs @@ -219,7 +219,7 @@ fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { mod tests { use super::*; use crate::datasource::object_store::{local::LocalFileSystem, ObjectStore}; - use crate::test; + use crate::{test, test_util}; use crate::{ physical_plan::{ @@ -232,7 +232,7 @@ mod tests { #[tokio::test] async fn test_union_partitions() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let fs: Arc = Arc::new(LocalFileSystem {}); // Create csv's with different partitioning diff --git a/datafusion/src/physical_plan/values.rs b/datafusion/src/physical_plan/values.rs index de15d40c68f9a..f4f8ccb6246ac 100644 --- a/datafusion/src/physical_plan/values.rs +++ b/datafusion/src/physical_plan/values.rs @@ -170,11 +170,11 @@ impl ExecutionPlan for ValuesExec { #[cfg(test)] mod tests { use super::*; - use crate::test; + use crate::test_util; #[tokio::test] async fn values_empty_case() -> Result<()> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let empty = ValuesExec::try_new(schema, vec![]); assert!(!empty.is_ok()); Ok(()) diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs index 28bf402936129..8b182f9a6138b 100644 --- a/datafusion/src/physical_plan/windows/mod.rs +++ b/datafusion/src/physical_plan/windows/mod.rs @@ -181,14 +181,15 @@ mod tests { use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; use crate::physical_plan::{collect, Statistics}; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; - use crate::test::{self, aggr_test_schema, assert_is_pending}; + use crate::test::{self, assert_is_pending}; + use crate::test_util::{self, aggr_test_schema}; use arrow::array::*; use arrow::datatypes::{DataType, Field, SchemaRef}; use arrow::record_batch::RecordBatch; use futures::FutureExt; fn create_test_schema(partitions: usize) -> Result<(Arc, SchemaRef)> { - let schema = test::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); let (_, files) = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; let csv = CsvExec::new( diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index c13df55c05dfd..16c1383c119fa 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -26,7 +26,7 @@ use array::{ TimestampNanosecondArray, TimestampSecondArray, }; use arrow::array::{self, Int32Array}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use futures::{Future, FutureExt}; use std::fs::File; @@ -104,25 +104,6 @@ pub fn create_partitioned_csv( Ok((tmp_dir.into_path().to_str().unwrap().to_string(), groups)) } -/// Get the schema for the aggregate_test_* csv files -pub fn aggr_test_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), - Field::new("c2", DataType::UInt32, false), - Field::new("c3", DataType::Int8, false), - Field::new("c4", DataType::Int16, false), - Field::new("c5", DataType::Int32, false), - Field::new("c6", DataType::Int64, false), - Field::new("c7", DataType::UInt8, false), - Field::new("c8", DataType::UInt16, false), - Field::new("c9", DataType::UInt32, false), - Field::new("c10", DataType::UInt64, false), - Field::new("c11", DataType::Float32, false), - Field::new("c12", DataType::Float64, false), - Field::new("c13", DataType::Utf8, false), - ])) -} - /// some tests share a common table with different names pub fn test_table_scan_with_name(name: &str) -> Result { let schema = Schema::new(vec![ diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs index 03e0054b15706..b87b756dba123 100644 --- a/datafusion/src/test_util.rs +++ b/datafusion/src/test_util.rs @@ -17,7 +17,9 @@ //! Utility functions to make testing DataFusion based crates easier -use std::{env, error::Error, path::PathBuf}; +use std::{env, error::Error, path::PathBuf, sync::Arc}; + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; /// Compares formatted output of a record batch with an expected /// vector of strings, with the result of pretty formatting record @@ -155,7 +157,7 @@ pub fn arrow_test_data() -> String { } } -/// Returns the parquest test data directory, which is by default +/// Returns the parquet test data directory, which is by default /// stored in a git submodule rooted at /// `parquest-testing/data`. /// @@ -225,6 +227,25 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, false), + Field::new("c2", DataType::UInt32, false), + Field::new("c3", DataType::Int8, false), + Field::new("c4", DataType::Int16, false), + Field::new("c5", DataType::Int32, false), + Field::new("c6", DataType::Int64, false), + Field::new("c7", DataType::UInt8, false), + Field::new("c8", DataType::UInt16, false), + Field::new("c9", DataType::UInt32, false), + Field::new("c10", DataType::UInt64, false), + Field::new("c11", DataType::Float32, false), + Field::new("c12", DataType::Float64, false), + Field::new("c13", DataType::Utf8, false), + ])) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/tests/common.rs b/datafusion/tests/common.rs deleted file mode 100644 index 3490db5e091fd..0000000000000 --- a/datafusion/tests/common.rs +++ /dev/null @@ -1,40 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! methods that are common to multiple integration test setups - -use std::sync::Arc; - -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - -pub fn aggr_test_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), - Field::new("c2", DataType::UInt32, false), - Field::new("c3", DataType::Int8, false), - Field::new("c4", DataType::Int16, false), - Field::new("c5", DataType::Int32, false), - Field::new("c6", DataType::Int64, false), - Field::new("c7", DataType::UInt8, false), - Field::new("c8", DataType::UInt16, false), - Field::new("c9", DataType::UInt32, false), - Field::new("c10", DataType::UInt64, false), - Field::new("c11", DataType::Float32, false), - Field::new("c12", DataType::Float64, false), - Field::new("c13", DataType::Utf8, false), - ])) -} diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs index 789511065fc8b..e68ef32fa3eeb 100644 --- a/datafusion/tests/path_partition.rs +++ b/datafusion/tests/path_partition.rs @@ -33,12 +33,10 @@ use datafusion::{ error::{DataFusionError, Result}, physical_plan::ColumnStatistics, prelude::ExecutionContext, - test_util::{arrow_test_data, parquet_test_data}, + test_util::{self, arrow_test_data, parquet_test_data}, }; use futures::{stream, StreamExt}; -mod common; - #[tokio::test] async fn csv_filter_with_file_col() -> Result<()> { let mut ctx = ExecutionContext::new(); @@ -281,7 +279,7 @@ fn register_partitioned_aggregate_csv( ) { let testdata = arrow_test_data(); let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata); - let file_schema = common::aggr_test_schema(); + let file_schema = test_util::aggr_test_schema(); let object_store = MirroringObjectStore::new_arc(csv_file_path, store_paths); let mut options = ListingOptions::new(Arc::new(CsvFormat::default())); diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 63d4d691181cb..ac48faca26c3e 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -43,6 +43,7 @@ use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::ExecutionPlanVisitor; use datafusion::prelude::*; +use datafusion::test_util; use datafusion::{datasource::MemTable, physical_plan::collect}; use datafusion::{ error::{DataFusionError, Result}, @@ -50,8 +51,6 @@ use datafusion::{ }; use datafusion::{execution::context::ExecutionContext, physical_plan::displayable}; -mod common; - #[tokio::test] async fn nyc() -> Result<()> { // schema for nyxtaxi csv files @@ -3461,7 +3460,7 @@ async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) { async fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> { let testdata = datafusion::test_util::arrow_test_data(); - let schema = common::aggr_test_schema(); + let schema = test_util::aggr_test_schema(); ctx.register_csv( "aggregate_test_100", &format!("{}/csv/aggregate_test_100.csv", testdata),