Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::path::PathBuf;
use std::time::Duration;

use sqllogictest::DBOutput;
Expand All @@ -36,12 +37,12 @@ mod util;

pub struct DataFusion {
ctx: SessionContext,
file_name: String,
relative_path: PathBuf,
}

impl DataFusion {
pub fn new(ctx: SessionContext, file_name: String) -> Self {
Self { ctx, file_name }
pub fn new(ctx: SessionContext, relative_path: PathBuf) -> Self {
Self { ctx, relative_path }
}
}

Expand All @@ -50,7 +51,11 @@ impl sqllogictest::AsyncDB for DataFusion {
type Error = DFSqlLogicTestError;

async fn run(&mut self, sql: &str) -> Result<DBOutput> {
println!("[{}] Running query: \"{}\"", self.file_name, sql);
println!(
"[{}] Running query: \"{}\"",
self.relative_path.display(),
sql
);
let result = run_query(&self.ctx, sql).await?;
Ok(result)
}
Expand Down
41 changes: 25 additions & 16 deletions datafusion/core/tests/sqllogictests/src/engines/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::path::{Path, PathBuf};
use std::str::FromStr;

use async_trait::async_trait;
Expand Down Expand Up @@ -47,13 +48,13 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub struct Postgres {
client: tokio_postgres::Client,
join_handle: JoinHandle<()>,
/// Filename, for display purposes
file_name: String,
/// Relative test file path
relative_path: PathBuf,
}

impl Postgres {
/// Creates a runner for executing queries against an existing
/// posgres connection. `file_name` is used for display output
/// Creates a runner for executing queries against an existing postgres connection.
/// `relative_path` is used for display output and to create a postgres schema.
///
/// The database connection details can be overridden by the
/// `PG_URI` environment variable.
Expand All @@ -65,9 +66,7 @@ impl Postgres {
/// ```
///
/// See https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url for format
pub async fn connect(file_name: impl Into<String>) -> Result<Self> {
let file_name = file_name.into();

pub async fn connect(relative_path: PathBuf) -> Result<Self> {
let uri =
std::env::var("PG_URI").map_or(PG_URI.to_string(), std::convert::identity);

Expand All @@ -89,7 +88,7 @@ impl Postgres {
}
});

let schema = schema_name(&file_name);
let schema = schema_name(&relative_path);

// create a new clean schema for running the test
debug!("Creating new empty schema '{schema}'");
Expand All @@ -108,7 +107,7 @@ impl Postgres {
Ok(Self {
client,
join_handle,
file_name,
relative_path,
})
}

Expand Down Expand Up @@ -188,12 +187,18 @@ fn no_quotes(t: &str) -> &str {

/// Given a file name like pg_compat_foo.slt
/// return a schema name
fn schema_name(file_name: &str) -> &str {
file_name
.split('.')
.next()
.unwrap_or("default_schema")
.trim_start_matches("pg_")
fn schema_name(relative_path: &Path) -> String {
relative_path
.file_name()
.map(|name| {
name.to_string_lossy()
.chars()
.filter(|ch| ch.is_ascii_alphabetic())
.collect::<String>()
.trim_start_matches("pg_")
.to_string()
})
.unwrap_or_else(|| "default_schema".to_string())
}

impl Drop for Postgres {
Expand Down Expand Up @@ -249,7 +254,11 @@ impl sqllogictest::AsyncDB for Postgres {
type Error = Error;

async fn run(&mut self, sql: &str) -> Result<DBOutput, Self::Error> {
println!("[{}] Running query: \"{}\"", self.file_name, sql);
println!(
"[{}] Running query: \"{}\"",
self.relative_path.display(),
sql
);

let lower_sql = sql.trim_start().to_ascii_lowercase();

Expand Down
92 changes: 56 additions & 36 deletions datafusion/core/tests/sqllogictests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod engines;
mod setup;
mod utils;

const TEST_DIRECTORY: &str = "tests/sqllogictests/test_files";
const TEST_DIRECTORY: &str = "tests/sqllogictests/test_files/";
const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";

#[tokio::main]
Expand All @@ -47,40 +47,37 @@ pub async fn main() -> Result<(), Box<dyn Error>> {

let options = Options::new();

let files: Vec<_> = read_test_files(&options);

info!("Running test files {:?}", files);

for path in files {
let file_name = path.file_name().unwrap().to_str().unwrap().to_string();

for (path, relative_path) in read_test_files(&options) {
if options.complete_mode {
run_complete_file(&path, file_name).await?;
run_complete_file(&path, relative_path).await?;
} else if options.postgres_runner {
run_test_file_with_postgres(&path, file_name).await?;
run_test_file_with_postgres(&path, relative_path).await?;
} else {
run_test_file(&path, file_name).await?;
run_test_file(&path, relative_path).await?;
}
}

Ok(())
}

async fn run_test_file(path: &PathBuf, file_name: String) -> Result<(), Box<dyn Error>> {
println!("Running with DataFusion runner: {}", path.display());
let ctx = context_for_test_file(&file_name).await;
let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, file_name));
async fn run_test_file(
path: &Path,
relative_path: PathBuf,
) -> Result<(), Box<dyn Error>> {
info!("Running with DataFusion runner: {}", path.display());
let ctx = context_for_test_file(&relative_path).await;
let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path));
runner.run_file_async(path).await?;
Ok(())
}

async fn run_test_file_with_postgres(
path: &PathBuf,
file_name: String,
path: &Path,
relative_path: PathBuf,
) -> Result<(), Box<dyn Error>> {
info!("Running with Postgres runner: {}", path.display());

let postgres_client = Postgres::connect(file_name).await?;
let postgres_client = Postgres::connect(relative_path).await?;

sqllogictest::Runner::new(postgres_client)
.run_file_async(path)
Expand All @@ -90,17 +87,15 @@ async fn run_test_file_with_postgres(
}

async fn run_complete_file(
path: &PathBuf,
file_name: String,
path: &Path,
relative_path: PathBuf,
) -> Result<(), Box<dyn Error>> {
use sqllogictest::{default_validator, update_test_file};

info!("Using complete mode to complete: {}", path.display());

let ctx = context_for_test_file(&file_name).await;
let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, file_name));

info!("Using complete mode to complete {}", path.display());
let ctx = context_for_test_file(&relative_path).await;
let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path));
let col_separator = " ";
let validator = default_validator;
update_test_file(path, &mut runner, col_separator, validator)
Expand All @@ -110,18 +105,42 @@ async fn run_complete_file(
Ok(())
}

fn read_test_files(options: &Options) -> Vec<PathBuf> {
std::fs::read_dir(TEST_DIRECTORY)
.unwrap()
.map(|path| path.unwrap().path())
.filter(|path| options.check_test_file(path.as_path()))
.filter(|path| options.check_pg_compat_file(path.as_path()))
.collect()
fn read_test_files<'a>(
options: &'a Options,
) -> Box<dyn Iterator<Item = (PathBuf, PathBuf)> + 'a> {
Box::new(
read_dir_recursive(TEST_DIRECTORY)
.map(|path| {
(
path.clone(),
PathBuf::from(
path.to_string_lossy().strip_prefix(TEST_DIRECTORY).unwrap(),
),
)
})
.filter(|(_, relative_path)| options.check_test_file(relative_path))
.filter(|(path, _)| options.check_pg_compat_file(path.as_path())),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Postgres file filter doesn't account for folder names, but it can. For example, instead of expecting a pg_compat prefix on the file name, the runner could check this prefix on a relative file path.

)
}

fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Box<dyn Iterator<Item = PathBuf>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WalkDir crate seemed like an overkill because there is no need for link resolution and similar features, but only recursive traversing.
Of course, WalkDir can be thrown in here instead of the custom code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree walkdir is not needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might be able to avoid the Box using something like

Suggested change
fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Box<dyn Iterator<Item = PathBuf>> {
fn read_dir_recursive<P: AsRef<Path>>(path: P) -> impl Iterator<Item = PathBuf> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is that the resolved type for the whole implementation inside this function is FlatMap. But inside flat_map there is also std::iter::once for the case when there is only one file. And then rust cannot resolve a concrete implementation based on these options.

It is probably possible to apply your suggestion, but my rust skills were not good enough.

Copy link
Contributor

@alamb alamb Jan 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think https://docs.rs/itertools/latest/itertools/enum.Either.html# is one way to handle this. However, I don't think it is important to change

Box::new(
std::fs::read_dir(path)
.expect("Readable directory")
.map(|path| path.expect("Readable entry").path())
.flat_map(|path| {
if path.is_dir() {
read_dir_recursive(path)
} else {
Box::new(std::iter::once(path))
}
}),
)
}

/// Create a SessionContext, configured for the specific test
async fn context_for_test_file(file_name: &str) -> SessionContext {
match file_name {
async fn context_for_test_file(relative_path: &Path) -> SessionContext {
match relative_path.file_name().unwrap().to_str().unwrap() {
"aggregate.slt" | "select.slt" => {
info!("Registering aggregate tables");
let ctx = SessionContext::new();
Expand Down Expand Up @@ -185,14 +204,15 @@ impl Options {
/// To be compatible with this, treat the command line arguments as a
/// filter and that does a substring match on each input. returns
/// true f this path should be run
fn check_test_file(&self, path: &Path) -> bool {
fn check_test_file(&self, relative_path: &Path) -> bool {
if self.filters.is_empty() {
return true;
}

// otherwise check if any filter matches
let path_str = path.to_string_lossy();
self.filters.iter().any(|filter| path_str.contains(filter))
self.filters
.iter()
.any(|filter| relative_path.to_string_lossy().contains(filter))
}

/// Postgres runner executes only tests in files with specific names
Expand Down