From 76c2f064247a147e969e26388712939a420950f7 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sat, 28 Dec 2024 22:30:31 +0000 Subject: [PATCH 1/4] Add support for sqlite test files to sqllogictest --- datafusion-testing | 2 +- .../sqllogictest/src/engines/datafusion_engine/normalize.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion-testing b/datafusion-testing index e2e320c9477a6..5cc59ceceeebe 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit e2e320c9477a6d8ab09662eae255887733c0e304 +Subproject commit 5cc59ceceeebeea6b39861210b6d1cd27e66648a diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index ced497de22a75..58400280072c1 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -239,6 +239,10 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { let key = dict.normalized_keys()[row]; Ok(cell_to_string(dict.values(), key)?) } + // only added because of a bug in v 1.0.4 (is) of lexical-write-integer + DataType::Int64 => { + Ok(format!("{}", get_row_value!(array::Int64Array, col, row))) + } _ => { let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); Ok(f.unwrap().value(row).to_string()) From 397ed8e7cf2033b5f55979ade752694647828003 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sun, 29 Dec 2024 21:32:49 +0000 Subject: [PATCH 2/4] Removed workaround for bug that was fixed. --- .../sqllogictest/src/engines/datafusion_engine/normalize.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index 58400280072c1..ced497de22a75 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -239,10 +239,6 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { let key = dict.normalized_keys()[row]; Ok(cell_to_string(dict.values(), key)?) } - // only added because of a bug in v 1.0.4 (is) of lexical-write-integer - DataType::Int64 => { - Ok(format!("{}", get_row_value!(array::Int64Array, col, row))) - } _ => { let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); Ok(f.unwrap().value(row).to_string()) From 78b654fa5e698345cd7d7f7aa160cd0ec113a8be Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 31 Dec 2024 16:13:05 +0000 Subject: [PATCH 3/4] Refactor sqllogictest to extract postgres functionality into a separate file. Removed dependency on once_cell in favour of LazyLock. --- datafusion/sqllogictest/Cargo.toml | 2 - .../sqllogictest/bin/postgres_container.rs | 134 ++++++++++++++++ datafusion/sqllogictest/bin/sqllogictests.rs | 148 +----------------- .../src/engines/postgres_engine/mod.rs | 1 - 4 files changed, 142 insertions(+), 143 deletions(-) create mode 100644 datafusion/sqllogictest/bin/postgres_container.rs diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 1bb88a8bd44f1..3104846eda733 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -49,7 +49,6 @@ indicatif = "0.17" itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } -once_cell = { version = "1.20", optional = true } postgres-protocol = { version = "0.6.7", optional = true } postgres-types = { version = "0.2.8", features = ["derive", "with-chrono-0_4"], optional = true } rust_decimal = { version = "1.36.0", features = ["tokio-pg"] } @@ -69,7 +68,6 @@ avro = ["datafusion/avro"] postgres = [ "bytes", "chrono", - "once_cell", "postgres-types", "postgres-protocol", "testcontainers", diff --git a/datafusion/sqllogictest/bin/postgres_container.rs b/datafusion/sqllogictest/bin/postgres_container.rs new file mode 100644 index 0000000000000..286d1ae2369c0 --- /dev/null +++ b/datafusion/sqllogictest/bin/postgres_container.rs @@ -0,0 +1,134 @@ +#![cfg(feature = "postgres")] + +use crate::Options; +use datafusion_common::Result; +use log::info; +use std::env::set_var; +use std::future::Future; +use std::sync::LazyLock; +use std::{env, thread}; +use testcontainers::core::IntoContainerPort; +use testcontainers::runners::AsyncRunner; +use testcontainers::ImageExt; +use testcontainers_modules::postgres; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::{mpsc, Mutex}; +use ContainerCommands::{FetchHost, FetchPort}; + +#[derive(Debug)] +pub enum ContainerCommands { + FetchHost, + FetchPort, + Stop, +} + +pub struct Channel { + pub tx: UnboundedSender, + pub rx: Mutex>, +} + +pub fn channel() -> Channel { + let (tx, rx) = mpsc::unbounded_channel(); + Channel { + tx, + rx: Mutex::new(rx), + } +} + +pub fn execute_blocking(f: F) { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(f); +} + +static POSTGRES_IN: LazyLock> = LazyLock::new(channel); +static POSTGRES_HOST: LazyLock> = LazyLock::new(channel); +static POSTGRES_PORT: LazyLock> = LazyLock::new(channel); +static POSTGRES_STOPPED: LazyLock> = LazyLock::new(channel); + +pub async fn initialize_postgres_container(options: &Options) -> Result<()> { + let start_pg_database = options.postgres_runner && !is_pg_uri_set(); + if start_pg_database { + info!("Starting postgres db ..."); + + thread::spawn(|| { + execute_blocking(start_postgres( + &POSTGRES_IN, + &POSTGRES_HOST, + &POSTGRES_PORT, + &POSTGRES_STOPPED, + )) + }); + + POSTGRES_IN.tx.send(FetchHost).unwrap(); + let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap(); + + POSTGRES_IN.tx.send(FetchPort).unwrap(); + let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap(); + + let pg_uri = format!("postgresql://postgres:postgres@{db_host}:{db_port}/test"); + info!("Postgres uri is {pg_uri}"); + + set_var("PG_URI", pg_uri); + } else { + // close receiver + POSTGRES_IN.rx.lock().await.close(); + } + + Ok(()) +} + +pub async fn terminate_postgres_container() -> Result<()> { + if !POSTGRES_IN.tx.is_closed() { + println!("Stopping postgres db ..."); + POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(()); + POSTGRES_STOPPED.rx.lock().await.recv().await; + } + + Ok(()) +} + +async fn start_postgres( + in_channel: &Channel, + host_channel: &Channel, + port_channel: &Channel, + stopped_channel: &Channel<()>, +) { + info!("Starting postgres test container with user postgres/postgres and db test"); + + let container = postgres::Postgres::default() + .with_user("postgres") + .with_password("postgres") + .with_db_name("test") + .with_mapped_port(16432, 5432.tcp()) + .with_tag("17-alpine") + .start() + .await + .unwrap(); + // uncomment this if you are running docker in docker + let host = "host.docker.internal".to_string(); + // let host = container.get_host().await.unwrap().to_string(); + let port = container.get_host_port_ipv4(5432).await.unwrap(); + + let mut rx = in_channel.rx.lock().await; + while let Some(command) = rx.recv().await { + match command { + FetchHost => host_channel.tx.send(host.clone()).unwrap(), + FetchPort => port_channel.tx.send(port).unwrap(), + ContainerCommands::Stop => { + container.stop().await.unwrap(); + stopped_channel.tx.send(()).unwrap(); + rx.close(); + } + } + } +} + +fn is_pg_uri_set() -> bool { + match env::var("PG_URI") { + Ok(_) => true, + Err(_) => false, + } +} diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 498539c1674a1..f6b35bf3771c4 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -28,33 +28,21 @@ use indicatif::{ use itertools::Itertools; use log::Level::{Info, Warn}; use log::{info, log_enabled, warn}; -#[cfg(feature = "postgres")] -use once_cell::sync::Lazy; use sqllogictest::{ parse_file, strict_column_validator, AsyncDB, Condition, Normalizer, Record, Validator, }; + #[cfg(feature = "postgres")] -use std::env::set_var; +use crate::postgres_container::{ + initialize_postgres_container, terminate_postgres_container, +}; use std::ffi::OsStr; use std::fs; -#[cfg(feature = "postgres")] -use std::future::Future; use std::path::{Path, PathBuf}; + #[cfg(feature = "postgres")] -use std::{env, thread}; -#[cfg(feature = "postgres")] -use testcontainers::core::IntoContainerPort; -#[cfg(feature = "postgres")] -use testcontainers::runners::AsyncRunner; -#[cfg(feature = "postgres")] -use testcontainers::ImageExt; -#[cfg(feature = "postgres")] -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -#[cfg(feature = "postgres")] -use tokio::sync::{mpsc, Mutex}; -#[cfg(feature = "postgres")] -use ContainerCommands::{FetchHost, FetchPort}; +mod postgres_container; const TEST_DIRECTORY: &str = "test_files/"; const DATAFUSION_TESTING_TEST_DIRECTORY: &str = "../../datafusion-testing/data/"; @@ -170,31 +158,7 @@ async fn run_tests() -> Result<()> { options.warn_on_ignored(); #[cfg(feature = "postgres")] - let start_pg_database = options.postgres_runner && !is_pg_uri_set(); - #[cfg(feature = "postgres")] - if start_pg_database { - info!("Starting postgres db ..."); - - thread::spawn(|| { - execute_blocking(start_postgres( - &POSTGRES_IN, - &POSTGRES_HOST, - &POSTGRES_PORT, - &POSTGRES_STOPPED, - )) - }); - - POSTGRES_IN.tx.send(FetchHost).unwrap(); - let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap(); - - POSTGRES_IN.tx.send(FetchPort).unwrap(); - let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap(); - - let pg_uri = format!("postgresql://postgres:postgres@{db_host}:{db_port}/test"); - info!("Postgres uri is {pg_uri}"); - - set_var("PG_URI", pg_uri); - } + initialize_postgres_container(&options).await?; // Run all tests in parallel, reporting failures at the end // @@ -277,11 +241,7 @@ async fn run_tests() -> Result<()> { m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?; #[cfg(feature = "postgres")] - if start_pg_database { - println!("Stopping postgres db ..."); - POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(()); - POSTGRES_STOPPED.rx.lock().await.recv().await; - } + terminate_postgres_container().await?; // report on any errors if !errors.is_empty() { @@ -294,14 +254,6 @@ async fn run_tests() -> Result<()> { } } -#[cfg(feature = "postgres")] -fn is_pg_uri_set() -> bool { - match env::var("PG_URI") { - Ok(_) => true, - Err(_) => false, - } -} - async fn run_test_file( test_file: TestFile, validator: Validator, @@ -758,87 +710,3 @@ impl Options { } } } - -#[cfg(feature = "postgres")] -pub async fn start_postgres( - in_channel: &Channel, - host_channel: &Channel, - port_channel: &Channel, - stopped_channel: &Channel<()>, -) { - info!("Starting postgres test container with user postgres/postgres and db test"); - - let container = testcontainers_modules::postgres::Postgres::default() - .with_user("postgres") - .with_password("postgres") - .with_db_name("test") - .with_mapped_port(16432, 5432.tcp()) - .with_tag("17-alpine") - .start() - .await - .unwrap(); - // uncomment this if you are running docker in docker - // let host = "host.docker.internal".to_string(); - let host = container.get_host().await.unwrap().to_string(); - let port = container.get_host_port_ipv4(5432).await.unwrap(); - - let mut rx = in_channel.rx.lock().await; - while let Some(command) = rx.recv().await { - match command { - FetchHost => host_channel.tx.send(host.clone()).unwrap(), - FetchPort => port_channel.tx.send(port).unwrap(), - ContainerCommands::Stop => { - container.stop().await.unwrap(); - stopped_channel.tx.send(()).unwrap(); - rx.close(); - } - } - } -} - -#[cfg(feature = "postgres")] -#[derive(Debug)] -pub enum ContainerCommands { - FetchHost, - FetchPort, - Stop, -} - -#[cfg(feature = "postgres")] -pub struct Channel { - pub tx: UnboundedSender, - pub rx: Mutex>, -} - -#[cfg(feature = "postgres")] -pub fn channel() -> Channel { - let (tx, rx) = mpsc::unbounded_channel(); - Channel { - tx, - rx: Mutex::new(rx), - } -} - -#[cfg(feature = "postgres")] -pub fn execute_blocking(f: F) { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(f); -} - -#[cfg(feature = "postgres")] -pub struct HostPort { - pub host: String, - pub port: u16, -} - -#[cfg(feature = "postgres")] -static POSTGRES_IN: Lazy> = Lazy::new(channel); -#[cfg(feature = "postgres")] -static POSTGRES_HOST: Lazy> = Lazy::new(channel); -#[cfg(feature = "postgres")] -static POSTGRES_PORT: Lazy> = Lazy::new(channel); -#[cfg(feature = "postgres")] -static POSTGRES_STOPPED: Lazy> = Lazy::new(channel); diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index 1439695d62c6b..6391f666b422a 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -215,7 +215,6 @@ fn no_quotes(t: &str) -> &str { fn schema_name(relative_path: &Path) -> String { relative_path .to_string_lossy() - .to_string() .chars() .filter(|ch| ch.is_ascii_alphanumeric()) .collect::() From 23dec4a0793ba05394e3c399c65d140b782a16e4 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 31 Dec 2024 20:23:08 +0000 Subject: [PATCH 4/4] Add missing license header. --- .../sqllogictest/bin/postgres_container.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/sqllogictest/bin/postgres_container.rs b/datafusion/sqllogictest/bin/postgres_container.rs index 286d1ae2369c0..210b9b3e361c9 100644 --- a/datafusion/sqllogictest/bin/postgres_container.rs +++ b/datafusion/sqllogictest/bin/postgres_container.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #![cfg(feature = "postgres")] use crate::Options;