diff --git a/Cargo.lock b/Cargo.lock index 9f9263e529034..d6c759b8e71d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2592,7 +2592,10 @@ dependencies = [ "datafusion-physical-plan", "datafusion-sql", "getrandom 0.2.15", + "insta", + "object_store", "tokio", + "url", "wasm-bindgen", "wasm-bindgen-test", ] diff --git a/datafusion/wasmtest/Cargo.toml b/datafusion/wasmtest/Cargo.toml index 94515c6754a7a..10eab025734c9 100644 --- a/datafusion/wasmtest/Cargo.toml +++ b/datafusion/wasmtest/Cargo.toml @@ -58,5 +58,8 @@ getrandom = { version = "0.2.8", features = ["js"] } wasm-bindgen = "0.2.99" [dev-dependencies] +insta = { workspace = true } +object_store = { workspace = true } tokio = { workspace = true } +url = { workspace = true } wasm-bindgen-test = "0.3.49" diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index df0d9d6cbf37e..6c7be9056eb43 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -82,6 +82,7 @@ pub fn basic_parse() { #[cfg(test)] mod test { use super::*; + use datafusion::execution::options::ParquetReadOptions; use datafusion::{ arrow::{ array::{ArrayRef, Int32Array, RecordBatch, StringArray}, @@ -90,12 +91,16 @@ mod test { datasource::MemTable, execution::context::SessionContext, }; + use datafusion_common::test_util::batches_to_string; use datafusion_execution::{ config::SessionConfig, disk_manager::DiskManagerConfig, runtime_env::RuntimeEnvBuilder, }; use datafusion_physical_plan::collect; use datafusion_sql::parser::DFParser; + use insta::assert_snapshot; + use object_store::{memory::InMemory, path::Path, ObjectStore}; + use url::Url; use wasm_bindgen_test::wasm_bindgen_test; wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); @@ -115,6 +120,22 @@ mod test { let session_config = SessionConfig::new().with_target_partitions(1); Arc::new(SessionContext::new_with_config_rt(session_config, rt)) } + + fn create_test_data() -> (Arc, RecordBatch) { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, false), + ])); + + let data: Vec = vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ]; + + let batch = RecordBatch::try_new(schema.clone(), data).unwrap(); + (schema, batch) + } + #[wasm_bindgen_test(unsupported = tokio::test)] async fn basic_execute() { let sql = "SELECT 2 + 2;"; @@ -185,17 +206,22 @@ mod test { #[wasm_bindgen_test(unsupported = tokio::test)] async fn test_parquet_write() { - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("value", DataType::Utf8, false), - ])); + let (schema, batch) = create_test_data(); + let mut buffer = Vec::new(); + let mut writer = datafusion::parquet::arrow::ArrowWriter::try_new( + &mut buffer, + schema.clone(), + None, + ) + .unwrap(); - let data: Vec = vec![ - Arc::new(Int32Array::from(vec![1])), - Arc::new(StringArray::from(vec!["a"])), - ]; + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } - let batch = RecordBatch::try_new(schema.clone(), data).unwrap(); + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn test_parquet_read_and_write() { + let (schema, batch) = create_test_data(); let mut buffer = Vec::new(); let mut writer = datafusion::parquet::arrow::ArrowWriter::try_new( &mut buffer, @@ -203,8 +229,33 @@ mod test { None, ) .unwrap(); - writer.write(&batch).unwrap(); writer.close().unwrap(); + + let session_ctx = SessionContext::new(); + let store = InMemory::new(); + + let path = Path::from("a.parquet"); + store.put(&path, buffer.into()).await.unwrap(); + + let url = Url::parse("memory://").unwrap(); + session_ctx.register_object_store(&url, Arc::new(store)); + + let df = session_ctx + .read_parquet("memory:///", ParquetReadOptions::new()) + .await + .unwrap(); + + let result = df.collect().await.unwrap(); + + assert_snapshot!(batches_to_string(&result), @r" + +----+-------+ + | id | value | + +----+-------+ + | 1 | a | + | 2 | b | + | 3 | c | + +----+-------+ + "); } }