diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 981bdf34f539f..40ae75cd7f802 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -72,7 +72,6 @@ pub mod create_drop; pub mod explain_analyze; pub mod expr; pub mod joins; -pub mod partitioned_csv; pub mod repartition; pub mod select; mod sql_api; diff --git a/datafusion/core/tests/sql/partitioned_csv.rs b/datafusion/core/tests/sql/partitioned_csv.rs deleted file mode 100644 index b77557a66cd89..0000000000000 --- a/datafusion/core/tests/sql/partitioned_csv.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Utility functions for creating and running with a partitioned csv dataset. - -use std::{io::Write, sync::Arc}; - -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::{ - error::Result, - prelude::{CsvReadOptions, SessionConfig, SessionContext}, -}; -use tempfile::TempDir; - -/// Generate CSV partitions within the supplied directory -fn populate_csv_partitions( - tmp_dir: &TempDir, - partition_count: usize, - file_extension: &str, -) -> Result { - // define schema for data source (csv file) - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::UInt32, false), - Field::new("c2", DataType::UInt64, false), - Field::new("c3", DataType::Boolean, false), - ])); - - // generate a partitioned file - for partition in 0..partition_count { - let filename = format!("partition-{partition}.{file_extension}"); - let file_path = tmp_dir.path().join(filename); - let mut file = std::fs::File::create(file_path)?; - - // generate some data - for i in 0..=10 { - let data = format!("{},{},{}\n", partition, i, i % 2 == 0); - file.write_all(data.as_bytes())?; - } - } - - Ok(schema) -} - -/// Generate a partitioned CSV file and register it with an execution context -pub async fn create_ctx( - tmp_dir: &TempDir, - partition_count: usize, -) -> Result { - let ctx = - SessionContext::new_with_config(SessionConfig::new().with_target_partitions(8)); - - let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?; - - // register csv file with the execution context - ctx.register_csv( - "test", - tmp_dir.path().to_str().unwrap(), - CsvReadOptions::new().schema(&schema), - ) - .await?; - - Ok(ctx) -} diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index cbdea9d729487..4a782e54b070c 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -482,7 +482,7 @@ async fn sort_on_window_null_string() -> Result<()> { async fn test_prepare_statement() -> Result<()> { let tmp_dir = TempDir::new()?; let partition_count = 4; - let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; + let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?; // sql to statement then to prepare logical plan with parameters // c1 defined as UINT32, c2 defined as UInt64 but the params are Int32 and Float64 @@ -529,7 +529,7 @@ async fn test_prepare_statement() -> Result<()> { async fn test_named_query_parameters() -> Result<()> { let tmp_dir = TempDir::new()?; let partition_count = 4; - let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; + let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?; // sql to statement then to logical plan with parameters // c1 defined as UINT32, c2 defined as UInt64 @@ -576,7 +576,7 @@ async fn test_named_query_parameters() -> Result<()> { async fn parallel_query_with_filter() -> Result<()> { let tmp_dir = TempDir::new()?; let partition_count = 4; - let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; + let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?; let dataframe = ctx .sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3") diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index 9facb064bf32a..5393083e6c53c 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -63,3 +63,76 @@ id6 value"6 id7 value"7 id8 value"8 id9 value"9 + + +# Read partitioned csv +statement ok +CREATE TABLE src_table_1 ( + int_col INT, + string_col TEXT, + bigint_col BIGINT, + partition_col INT +) AS VALUES +(1, 'aaa', 100, 1), +(2, 'bbb', 200, 1), +(3, 'ccc', 300, 1), +(4, 'ddd', 400, 1); + +statement ok +CREATE TABLE src_table_2 ( + int_col INT, + string_col TEXT, + bigint_col BIGINT, + partition_col INT +) AS VALUES +(5, 'eee', 500, 2), +(6, 'fff', 600, 2), +(7, 'ggg', 700, 2), +(8, 'hhh', 800, 2); + +query ITII +COPY src_table_1 TO 'test_files/scratch/csv_files/csv_partitions/1.csv' +(FORMAT CSV, SINGLE_FILE_OUTPUT true); +---- +4 + + +query ITII +COPY src_table_2 TO 'test_files/scratch/csv_files/csv_partitions/2.csv' +(FORMAT CSV, SINGLE_FILE_OUTPUT true); +---- +4 + +statement ok +CREATE EXTERNAL TABLE partitioned_table ( + int_col INT, + string_col TEXT, + bigint_col BIGINT, + partition_col INT +) +STORED AS CSV +WITH HEADER ROW +LOCATION 'test_files/scratch/csv_files/csv_partitions'; + +query ITII +SELECT * FROM partitioned_table ORDER BY int_col; +---- +1 aaa 100 1 +2 bbb 200 1 +3 ccc 300 1 +4 ddd 400 1 +5 eee 500 2 +6 fff 600 2 +7 ggg 700 2 +8 hhh 800 2 + +query TT +EXPLAIN SELECT * FROM partitioned_table ORDER BY int_col; +---- +logical_plan +Sort: partitioned_table.int_col ASC NULLS LAST +--TableScan: partitioned_table projection=[int_col, string_col, bigint_col, partition_col] +physical_plan +SortPreservingMergeExec: [int_col@0 ASC NULLS LAST] +--SortExec: expr=[int_col@0 ASC NULLS LAST] +----CsvExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/csv_files/csv_partitions/1.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/csv_files/csv_partitions/2.csv]]}, projection=[int_col, string_col, bigint_col, partition_col], has_header=true