Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ mod tests {

use super::*;
use crate::execution::options::CsvReadOptions;
use crate::logical_plan::*;
use crate::physical_plan::functions::ScalarFunctionImplementation;
use crate::physical_plan::functions::Volatility;
use crate::physical_plan::{window_functions, ColumnarValue};
use crate::{assert_batches_sorted_eq, execution::context::ExecutionContext};
use crate::{physical_plan::functions::ScalarFunctionImplementation, test};
use crate::{logical_plan::*, test_util};
use arrow::datatypes::DataType;

#[tokio::test]
Expand Down Expand Up @@ -510,7 +510,7 @@ mod tests {
ctx: &mut ExecutionContext,
table_name: &str,
) -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let testdata = crate::test_util::arrow_test_data();
ctx.register_csv(
table_name,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,11 @@ mod tests {
use crate::physical_plan::{collect, common};
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{self, assert_is_pending};
use crate::test_util;

#[tokio::test]
async fn merge() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();

let num_partitions = 4;
let (_, files) =
Expand Down
11 changes: 5 additions & 6 deletions datafusion/src/physical_plan/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,11 @@ impl ExecutionPlan for EmptyExec {
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::common;
use crate::test;
use crate::{physical_plan::common, test_util};

#[tokio::test]
async fn empty() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();

let empty = EmptyExec::new(false, schema.clone());
assert_eq!(empty.schema(), schema);
Expand All @@ -168,7 +167,7 @@ mod tests {

#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(false, schema);

let empty2 = empty.with_new_children(vec![])?;
Expand All @@ -184,7 +183,7 @@ mod tests {

#[tokio::test]
async fn invalid_execute() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(false, schema);

// ask for the wrong partition
Expand All @@ -195,7 +194,7 @@ mod tests {

#[tokio::test]
async fn produce_one_row() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(true, schema);

let iter = empty.execute(0).await?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ mod tests {
use crate::{
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
scalar::ScalarValue,
test::aggr_test_schema,
test_util::aggr_test_schema,
};
use futures::StreamExt;

Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,9 @@ fn create_dict_array(

#[cfg(test)]
mod tests {
use crate::test::{
aggr_test_schema, build_table_i32, columns, object_store::TestObjectStore,
use crate::{
test::{build_table_i32, columns, object_store::TestObjectStore},
test_util::aggr_test_schema,
};

use super::*;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,14 @@ mod tests {
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::scalar::ScalarValue;
use crate::test::{self};
use crate::test;
use crate::test_util;
use crate::{logical_plan::Operator, physical_plan::collect};
use std::iter::Iterator;

#[tokio::test]
async fn simple_predicate() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();

let partitions = 4;
let (_, files) =
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,11 @@ mod tests {
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
use crate::test;
use crate::{test, test_util};

#[tokio::test]
async fn limit() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();

let num_partitions = 4;
let (_, files) =
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,12 @@ mod tests {
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
use crate::scalar::ScalarValue;
use crate::test::{self};
use crate::test_util;
use futures::future;

#[tokio::test]
async fn project_first_column() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();

let partitions = 4;
let (_, files) =
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,14 @@ mod tests {
use crate::test::assert_is_pending;
use crate::test::exec::assert_strong_count_converges_to_zero;
use crate::test::{self, exec::BlockingExec};
use crate::test_util;
use arrow::array::*;
use arrow::datatypes::*;
use futures::FutureExt;

#[tokio::test]
async fn test_sort() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
Expand Down
12 changes: 6 additions & 6 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,14 +664,14 @@ mod tests {
use std::iter::FromIterator;

use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
use crate::assert_batches_eq;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::{collect, common};
use crate::test::{self, assert_is_pending};
use crate::{assert_batches_eq, test_util};

use super::*;
use arrow::datatypes::{DataType, Field, Schema};
Expand Down Expand Up @@ -930,7 +930,7 @@ mod tests {

#[tokio::test]
async fn test_partition_sort() {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
Expand Down Expand Up @@ -1013,7 +1013,7 @@ mod tests {
sort: Vec<PhysicalSortExpr>,
sizes: &[usize],
) -> Arc<dyn ExecutionPlan> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
Expand Down Expand Up @@ -1041,7 +1041,7 @@ mod tests {

#[tokio::test]
async fn test_partition_sort_streaming_input() {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let sort = vec![
// uint8
PhysicalSortExpr {
Expand Down Expand Up @@ -1080,7 +1080,7 @@ mod tests {

#[tokio::test]
async fn test_partition_sort_streaming_input_output() {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();

let sort = vec![
// float64
Expand Down Expand Up @@ -1195,7 +1195,7 @@ mod tests {

#[tokio::test]
async fn test_async() {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let sort = vec![PhysicalSortExpr {
expr: col("c12", &schema).unwrap(),
options: SortOptions::default(),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ fn stats_union(mut left: Statistics, right: Statistics) -> Statistics {
mod tests {
use super::*;
use crate::datasource::object_store::{local::LocalFileSystem, ObjectStore};
use crate::test;
use crate::{test, test_util};

use crate::{
physical_plan::{
Expand All @@ -232,7 +232,7 @@ mod tests {

#[tokio::test]
async fn test_union_partitions() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem {});

// Create csv's with different partitioning
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ impl ExecutionPlan for ValuesExec {
#[cfg(test)]
mod tests {
use super::*;
use crate::test;
use crate::test_util;

#[tokio::test]
async fn values_empty_case() -> Result<()> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let empty = ValuesExec::try_new(schema, vec![]);
assert!(!empty.is_ok());
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,15 @@ mod tests {
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
use crate::physical_plan::{collect, Statistics};
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{self, aggr_test_schema, assert_is_pending};
use crate::test::{self, assert_is_pending};
use crate::test_util::{self, aggr_test_schema};
use arrow::array::*;
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::record_batch::RecordBatch;
use futures::FutureExt;

fn create_test_schema(partitions: usize) -> Result<(Arc<CsvExec>, SchemaRef)> {
let schema = test::aggr_test_schema();
let schema = test_util::aggr_test_schema();
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
let csv = CsvExec::new(
Expand Down
21 changes: 1 addition & 20 deletions datafusion/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use array::{
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow::array::{self, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use futures::{Future, FutureExt};
use std::fs::File;
Expand Down Expand Up @@ -104,25 +104,6 @@ pub fn create_partitioned_csv(
Ok((tmp_dir.into_path().to_str().unwrap().to_string(), groups))
}

/// Get the schema for the aggregate_test_* csv files
pub fn aggr_test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]))
}

/// some tests share a common table with different names
pub fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
let schema = Schema::new(vec![
Expand Down
25 changes: 23 additions & 2 deletions datafusion/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

//! Utility functions to make testing DataFusion based crates easier

use std::{env, error::Error, path::PathBuf};
use std::{env, error::Error, path::PathBuf, sync::Arc};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
Expand Down Expand Up @@ -155,7 +157,7 @@ pub fn arrow_test_data() -> String {
}
}

/// Returns the parquest test data directory, which is by default
/// Returns the parquet test data directory, which is by default
/// stored in a git submodule rooted at
/// `parquest-testing/data`.
///
Expand Down Expand Up @@ -225,6 +227,25 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn
}
}

/// Get the schema for the aggregate_test_* csv files
pub fn aggr_test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
40 changes: 0 additions & 40 deletions datafusion/tests/common.rs

This file was deleted.

Loading