From 4314a3b67a7071aa3eee6df28b954fd62b52dc2a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 18 May 2022 14:13:14 +0100 Subject: [PATCH 01/13] Extract Listing URI logic into ListingTableUri structure --- benchmarks/src/bin/tpch.rs | 4 +- datafusion-examples/examples/flight_server.rs | 5 +- datafusion/core/Cargo.toml | 3 + .../core/benches/sort_limit_query_sql.rs | 7 +- datafusion/core/src/catalog/schema.rs | 23 +- .../core/src/datasource/listing/helpers.rs | 219 ++++++---------- datafusion/core/src/datasource/listing/mod.rs | 2 + .../core/src/datasource/listing/path.rs | 234 ++++++++++++++++++ .../core/src/datasource/listing/table.rs | 65 ++--- .../src/datasource/object_store_registry.rs | 47 ++-- datafusion/core/src/execution/context.rs | 71 +++--- datafusion/core/src/execution/runtime_env.rs | 6 +- datafusion/core/src/lib.rs | 10 +- datafusion/core/src/physical_plan/mod.rs | 5 +- datafusion/core/src/test/object_store.rs | 2 +- datafusion/core/src/test_util.rs | 14 +- datafusion/core/tests/path_partition.rs | 16 +- datafusion/core/tests/sql/explain_analyze.rs | 7 +- datafusion/core/tests/sql/json.rs | 2 +- datafusion/core/tests/sql/mod.rs | 5 +- datafusion/data-access/Cargo.toml | 3 +- .../data-access/src/object_store/local.rs | 26 -- .../data-access/src/object_store/mod.rs | 195 +-------------- 23 files changed, 463 insertions(+), 508 deletions(-) create mode 100644 datafusion/core/src/datasource/listing/path.rs diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index ba10d51c2e531..c7ae7b7e5bf51 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -50,6 +50,7 @@ use datafusion::{ use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; +use datafusion::datasource::listing::ListingTableUrl; use serde::Serialize; use structopt::StructOpt; @@ -425,7 +426,8 @@ fn get_table( table_partition_cols: vec![], }; - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path) + let uri = ListingTableUrl::parse(path)?; + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), uri) .with_listing_options(options) .with_schema(schema); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 703cb702546fe..d7f07b3cac538 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow_flight::SchemaAsIpc; use datafusion::datafusion_data_access::object_store::local::LocalFileSystem; use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::listing::{ListingOptions, ListingTableUrl}; use futures::Stream; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; @@ -68,9 +68,10 @@ impl FlightService for FlightServiceImpl { let request = request.into_inner(); let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let url = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; let schema = listing_options - .infer_schema(Arc::new(LocalFileSystem {}), &request.path[0]) + .infer_schema(Arc::new(LocalFileSystem {}), &url) .await .unwrap(); diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index f8170443e2135..08da53f9c5588 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", version = "8.0.0" } datafusion-row = { path = "../row", version = "8.0.0" } datafusion-sql = { path = "../sql", version = "8.0.0" } futures = "0.3" +glob = "0.3.0" hashbrown = { version = "0.12", features = ["raw"] } +itertools = "0.10" lazy_static = { version = "^1.4.0" } log = "^0.4" num-traits = { version = "0.2", optional = true } @@ -85,6 +87,7 @@ sqlparser = "0.17" tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } tokio-stream = "0.1" +url = "2.2" uuid = { version = "1.0", features = ["v4"] } [dev-dependencies] diff --git a/datafusion/core/benches/sort_limit_query_sql.rs b/datafusion/core/benches/sort_limit_query_sql.rs index faeff6bc9c690..0b9905ba56d04 100644 --- a/datafusion/core/benches/sort_limit_query_sql.rs +++ b/datafusion/core/benches/sort_limit_query_sql.rs @@ -20,7 +20,9 @@ extern crate criterion; use criterion::Criterion; use datafusion::datafusion_data_access::object_store::local::LocalFileSystem; use datafusion::datasource::file_format::csv::CsvFormat; -use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; use parking_lot::Mutex; use std::sync::Arc; @@ -64,11 +66,12 @@ fn create_context() -> Arc> { let testdata = datafusion::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); + let url = ListingTableUrl::parse(path).unwrap(); // create CSV data source let listing_options = ListingOptions::new(Arc::new(CsvFormat::default())); - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path) + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), url) .with_listing_options(listing_options) .with_schema(schema); diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index 9ef5c3d67895b..be690470fe9e9 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -23,7 +23,7 @@ use std::any::Any; use std::collections::HashMap; use std::sync::Arc; -use crate::datasource::listing::{ListingTable, ListingTableConfig}; +use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; use crate::datasource::object_store_registry::ObjectStoreRegistry; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; @@ -157,10 +157,7 @@ impl ObjectStoreSchemaProvider { } /// Retrieves a `ObjectStore` instance by scheme - pub fn object_store<'a>( - &self, - uri: &'a str, - ) -> Result<(Arc, &'a str)> { + pub fn object_store(&self, uri: &ListingTableUrl) -> Result> { self.object_store_registry .lock() .get_by_uri(uri) @@ -173,13 +170,13 @@ impl ObjectStoreSchemaProvider { pub async fn register_listing_table( &self, name: &str, - uri: &str, + uri: ListingTableUrl, config: Option, ) -> Result<()> { let config = match config { Some(cfg) => cfg, None => { - let (object_store, _path) = self.object_store(uri)?; + let object_store = self.object_store(&uri)?; ListingTableConfig::new(object_store, uri).infer().await? } }; @@ -255,6 +252,7 @@ mod tests { use crate::datasource::empty::EmptyTable; use crate::execution::context::SessionContext; + use crate::datasource::listing::ListingTableUrl; use futures::StreamExt; #[tokio::test] @@ -280,12 +278,13 @@ mod tests { async fn test_schema_register_listing_table() { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); + let uri = ListingTableUrl::parse(filename).unwrap(); let schema = ObjectStoreSchemaProvider::new(); let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {})); schema - .register_listing_table("alltypes_plain", &filename, None) + .register_listing_table("alltypes_plain", uri, None) .await .unwrap(); @@ -338,8 +337,9 @@ mod tests { || file == OsStr::new("alltypes_plain.parquet") { let name = path.file_stem().unwrap().to_str().unwrap(); + let path = ListingTableUrl::parse(&sized_file.path).unwrap(); schema - .register_listing_table(name, &sized_file.path, None) + .register_listing_table(name, path, None) .await .unwrap(); } @@ -360,17 +360,18 @@ mod tests { async fn test_schema_register_same_listing_table() { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); + let uri = ListingTableUrl::parse(filename).unwrap(); let schema = ObjectStoreSchemaProvider::new(); let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {})); schema - .register_listing_table("alltypes_plain", &filename, None) + .register_listing_table("alltypes_plain", uri.clone(), None) .await .unwrap(); schema - .register_listing_table("alltypes_plain", &filename, None) + .register_listing_table("alltypes_plain", uri, None) .await .unwrap(); } diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 11a91f2eeaa86..330517563e580 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -17,7 +17,6 @@ //! Helper functions for the table implementation -use std::path::{Component, Path}; use std::sync::Arc; use arrow::{ @@ -29,11 +28,7 @@ use arrow::{ record_batch::RecordBatch, }; use chrono::{TimeZone, Utc}; -use datafusion_common::DataFusionError; -use futures::{ - stream::{self}, - StreamExt, TryStreamExt, -}; +use futures::{stream::BoxStream, TryStreamExt}; use log::debug; use crate::{ @@ -44,7 +39,8 @@ use crate::{ scalar::ScalarValue, }; -use super::{PartitionedFile, PartitionedFileStream}; +use super::PartitionedFile; +use crate::datasource::listing::ListingTableUrl; use datafusion_data_access::{object_store::ObjectStore, FileMeta, SizedFile}; use datafusion_expr::Volatility; @@ -161,94 +157,53 @@ pub fn split_files( /// TODO for tables with many files (10k+), it will usually more efficient /// to first list the folders relative to the first partition dimension, /// prune those, then list only the contain of the remaining folders. -pub async fn pruned_partition_list( - store: &dyn ObjectStore, - table_path: &str, +pub async fn pruned_partition_list<'a>( + store: &'a dyn ObjectStore, + table_path: &'a ListingTableUrl, filters: &[Expr], - file_extension: &str, - table_partition_cols: &[String], -) -> Result { + file_extension: &'a str, + table_partition_cols: &'a [String], +) -> Result>> { + let list = table_path.list_all_files(store, file_extension); + // if no partition col => simply list all the files if table_partition_cols.is_empty() { - return Ok(Box::pin( - store - .glob_file_with_suffix(table_path, file_extension) - .await? - .map(|f| { - Ok(PartitionedFile { - partition_values: vec![], - file_meta: f?, - range: None, - }) - }), - )); + return Ok(Box::pin(list.map_ok(|object_meta| object_meta.into()))); } let applicable_filters: Vec<_> = filters .iter() .filter(|f| expr_applicable_for_cols(table_partition_cols, f)) .collect(); - let stream_path = table_path.to_owned(); + if applicable_filters.is_empty() { // Parse the partition values while listing all the files // Note: We might avoid parsing the partition values if they are not used in any projection, // but the cost of parsing will likely be far dominated by the time to fetch the listing from // the object store. - let table_partition_cols_stream = table_partition_cols.to_vec(); - Ok(Box::pin( - store - .glob_file_with_suffix(table_path, file_extension) - .await? - .filter_map(move |f| { - let stream_path = stream_path.clone(); - let table_partition_cols_stream = table_partition_cols_stream.clone(); - async move { - let file_meta = match f { - Ok(fm) => fm, - Err(err) => return Some(Err(err)), - }; - let parsed_path = parse_partitions_for_path( - &stream_path, - file_meta.path(), - &table_partition_cols_stream, - ) - .map(|p| { - p.iter() - .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned()))) - .collect() - }); - - parsed_path.map(|partition_values| { - Ok(PartitionedFile { - partition_values, - file_meta, - range: None, - }) - }) - } - }), - )) + Ok(Box::pin(list.try_filter_map(move |file_meta| async move { + let parsed_path = parse_partitions_for_path( + table_path, + file_meta.path(), + table_partition_cols, + ) + .map(|p| { + p.iter() + .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned()))) + .collect() + }); + + Ok(parsed_path.map(|partition_values| PartitionedFile { + partition_values, + file_meta, + range: None, + })) + }))) } else { // parse the partition values and serde them as a RecordBatch to filter them - // TODO avoid collecting but have a streaming memory table instead - let batches: Vec = store - .glob_file_with_suffix(table_path, file_extension) - .await? - // TODO we set an arbitrary high batch size here, it does not matter as we list - // all the files anyway. This number will need to be adjusted according to the object - // store if we switch to a streaming-stlye pruning of the files. For instance S3 lists - // 1000 items at a time so batches of 1000 would be ideal with S3 as store. - .chunks(1024) - .map(|v| { - v.into_iter() - .collect::>>() - }) - .map_err(DataFusionError::IoError) - .map(move |metas| paths_to_batch(table_partition_cols, &stream_path, &metas?)) - .try_collect() - .await?; - - let mem_table = MemTable::try_new(batches[0].schema(), vec![batches])?; + let metas: Vec<_> = list.try_collect().await?; + let batch = paths_to_batch(table_partition_cols, table_path, &metas)?; + let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; // Filter the partitions using a local datafusion context // TODO having the external context would allow us to resolve `Volatility::Stable` @@ -260,7 +215,7 @@ pub async fn pruned_partition_list( } let filtered_batches = df.collect().await?; - Ok(Box::pin(stream::iter( + Ok(Box::pin(futures::stream::iter( batches_to_paths(&filtered_batches).into_iter().map(Ok), ))) } @@ -275,7 +230,7 @@ pub async fn pruned_partition_list( /// Note: For the last modified date, this looses precisions higher than millisecond. fn paths_to_batch( table_partition_cols: &[String], - table_path: &str, + table_path: &ListingTableUrl, metas: &[FileMeta], ) -> Result { let mut key_builder = StringBuilder::new(metas.len()); @@ -373,21 +328,15 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec { /// Extract the partition values for the given `file_path` (in the given `table_path`) /// associated to the partitions defined by `table_partition_cols` fn parse_partitions_for_path<'a>( - table_path: &str, + table_path: &ListingTableUrl, file_path: &'a str, table_partition_cols: &[String], ) -> Option> { - let subpath = file_path.strip_prefix(table_path)?; - - // split subpath into components ignoring leading separator if exists - let subpath = Path::new(subpath) - .components() - .filter(|c| !matches!(c, Component::RootDir)) - .filter_map(|c| c.as_os_str().to_str()); + let subpath = table_path.strip_prefix(file_path)?; let mut part_values = vec![]; - for (path, pn) in subpath.zip(table_partition_cols) { - match path.split_once('=') { + for (part, pn) in subpath.zip(table_partition_cols) { + match part.split_once('=') { Some((name, val)) if name == pn => part_values.push(val), _ => return None, } @@ -401,6 +350,7 @@ mod tests { logical_plan::{case, col, lit}, test::object_store::TestObjectStore, }; + use futures::StreamExt; use super::*; @@ -453,7 +403,7 @@ mod tests { let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( store.as_ref(), - "tablepath/", + &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter], ".parquet", &[String::from("mypartition")], @@ -476,33 +426,34 @@ mod tests { let filter = Expr::eq(col("mypartition"), lit("val1")); let pruned = pruned_partition_list( store.as_ref(), - "tablepath/", + &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter], ".parquet", &[String::from("mypartition")], ) .await .expect("partition pruning failed") - .collect::>() - .await; + .try_collect::>() + .await + .unwrap(); assert_eq!(pruned.len(), 2); - let f1 = pruned[0].as_ref().expect("first item not an error"); + let f1 = &pruned[0]; assert_eq!( - &f1.file_meta.sized_file.path, + f1.file_meta.path(), "tablepath/mypartition=val1/file.parquet" ); assert_eq!( &f1.partition_values, &[ScalarValue::Utf8(Some(String::from("val1"))),] ); - let f2 = pruned[1].as_ref().expect("second item not an error"); + let f2 = &pruned[1]; assert_eq!( - &f2.file_meta.sized_file.path, + f2.file_meta.path(), "tablepath/mypartition=val1/other=val3/file.parquet" ); assert_eq!( - &f2.partition_values, + f2.partition_values, &[ScalarValue::Utf8(Some(String::from("val1"))),] ); } @@ -522,20 +473,21 @@ mod tests { let filter3 = Expr::eq(col("part2"), col("other")); let pruned = pruned_partition_list( store.as_ref(), - "tablepath/", + &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter1, filter2, filter3], ".parquet", &[String::from("part1"), String::from("part2")], ) .await .expect("partition pruning failed") - .collect::>() - .await; + .try_collect::>() + .await + .unwrap(); assert_eq!(pruned.len(), 2); - let f1 = pruned[0].as_ref().expect("first item not an error"); + let f1 = &pruned[0]; assert_eq!( - &f1.file_meta.sized_file.path, + f1.file_meta.path(), "tablepath/part1=p1v2/part2=p2v1/file1.parquet" ); assert_eq!( @@ -545,9 +497,9 @@ mod tests { ScalarValue::Utf8(Some(String::from("p2v1"))) ] ); - let f2 = pruned[1].as_ref().expect("second item not an error"); + let f2 = &pruned[1]; assert_eq!( - &f2.file_meta.sized_file.path, + f2.file_meta.path(), "tablepath/part1=p1v2/part2=p2v1/file2.parquet" ); assert_eq!( @@ -563,12 +515,16 @@ mod tests { fn test_parse_partitions_for_path() { assert_eq!( Some(vec![]), - parse_partitions_for_path("bucket/mytable", "bucket/mytable/file.csv", &[]) + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + "bucket/mytable/file.csv", + &[] + ) ); assert_eq!( None, parse_partitions_for_path( - "bucket/othertable", + &ListingTableUrl::parse("file:///bucket/othertable").unwrap(), "bucket/mytable/file.csv", &[] ) @@ -576,7 +532,7 @@ mod tests { assert_eq!( None, parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/file.csv", &[String::from("mypartition")] ) @@ -584,7 +540,7 @@ mod tests { assert_eq!( Some(vec!["v1"]), parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/mypartition=v1/file.csv", &[String::from("mypartition")] ) @@ -592,7 +548,7 @@ mod tests { assert_eq!( Some(vec!["v1"]), parse_partitions_for_path( - "bucket/mytable/", + &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(), "bucket/mytable/mypartition=v1/file.csv", &[String::from("mypartition")] ) @@ -601,7 +557,7 @@ mod tests { assert_eq!( None, parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/v1/file.csv", &[String::from("mypartition")] ) @@ -609,7 +565,7 @@ mod tests { assert_eq!( Some(vec!["v1", "v2"]), parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv", &[String::from("mypartition"), String::from("otherpartition")] ) @@ -617,34 +573,13 @@ mod tests { assert_eq!( Some(vec!["v1"]), parse_partitions_for_path( - "bucket/mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv", &[String::from("mypartition")] ) ); } - #[cfg(target_os = "windows")] - #[test] - fn test_parse_partitions_for_path_windows() { - assert_eq!( - Some(vec!["v1"]), - parse_partitions_for_path( - "bucket\\mytable", - "bucket\\mytable\\mypartition=v1\\file.csv", - &[String::from("mypartition")] - ) - ); - assert_eq!( - Some(vec!["v1", "v2"]), - parse_partitions_for_path( - "bucket\\mytable", - "bucket\\mytable\\mypartition=v1\\otherpartition=v2\\file.csv", - &[String::from("mypartition"), String::from("otherpartition")] - ) - ); - } - #[test] fn test_path_batch_roundtrip_no_partiton() { let files = vec![ @@ -664,7 +599,8 @@ mod tests { }, ]; - let batches = paths_to_batch(&[], "mybucket/tablepath", &files) + let table_path = ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(); + let batches = paths_to_batch(&[], &table_path, &files) .expect("Serialization of file list to batch failed"); let parsed_files = batches_to_paths(&[batches]); @@ -698,9 +634,12 @@ mod tests { }, ]; - let batches = - paths_to_batch(&[String::from("part1")], "mybucket/tablepath", &files) - .expect("Serialization of file list to batch failed"); + let batches = paths_to_batch( + &[String::from("part1")], + &ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(), + &files, + ) + .expect("Serialization of file list to batch failed"); let parsed_files = batches_to_paths(&[batches]); assert_eq!(parsed_files.len(), 2); diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 0f0a7d20ee482..a4b68f45f0d3f 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -19,6 +19,7 @@ //! to get the list of files to process. mod helpers; +mod path; mod table; use datafusion_common::ScalarValue; @@ -26,6 +27,7 @@ use datafusion_data_access::{FileMeta, Result, SizedFile}; use futures::Stream; use std::pin::Pin; +pub use path::ListingTableUrl; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; /// Stream of files get listed from object store diff --git a/datafusion/core/src/datasource/listing/path.rs b/datafusion/core/src/datasource/listing/path.rs new file mode 100644 index 0000000000000..dcb179d52f171 --- /dev/null +++ b/datafusion/core/src/datasource/listing/path.rs @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::{DataFusionError, Result}; +use datafusion_data_access::object_store::ObjectStore; +use datafusion_data_access::FileMeta; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use glob::Pattern; +use url::Url; + +/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`] +/// for more information on the supported expressions +#[derive(Debug, Clone)] +pub struct ListingTableUrl { + /// A URL that identifies a file or directory to list files from + url: Url, + /// An optional glob expression used to filter files + glob: Option, +} + +impl ListingTableUrl { + /// Parse a provided string as a `ListingTableUrl` + /// + /// # Paths without a Scheme + /// + /// If no scheme is provided, the string will be interpreted as a + /// path on the local filesystem, using the operating system's + /// standard path delimiter - i.e. `\` on Windows, `/` on Unix. + /// + /// If the path contains any of `'?', '*', '['`, it will be considered + /// a glob expression and resolved as described in the section below. + /// + /// Otherwise, the path will be resolved to an absolute path, returning + /// an error if it does not exist, and converted to a [file URI] + /// + /// If you wish to specify a path that does not exist on the local + /// machine you must provide it as a fully-qualified [file URI] + /// e.g. `file:///myfile.txt` + /// + /// ## Glob Paths + /// + /// If no scheme is provided, and the path contains a glob expression, it will + /// be resolved as follows. + /// + /// The string up to the first path segment containing a glob expression will be extracted, + /// and resolved in the same manner as a normal scheme-less path. That is, resolved to + /// an absolute path on the local filesystem, returning an error if it does not exist, + /// and converted to a [file URI] + /// + /// The remaining string will be interpreted as a [`glob::Pattern`] and used as a + /// filter when listing files from object storage + /// + /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme + pub fn parse(s: impl AsRef) -> Result { + let s = s.as_ref(); + Ok(match Url::parse(s) { + Ok(url) => Self { url, glob: None }, + Err(url::ParseError::RelativeUrlWithoutBase) => { + let (prefix, glob) = match split_glob_expression(s) { + Some((prefix, glob)) => { + let glob = Pattern::new(glob) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + (prefix, Some(glob)) + } + None => (s, None), + }; + + let path = std::path::Path::new(prefix).canonicalize()?; + let url = match path.is_file() { + true => Url::from_file_path(path).unwrap(), + false => Url::from_directory_path(path).unwrap(), + }; + + Self { url, glob } + } + Err(e) => return Err(DataFusionError::External(Box::new(e))), + }) + } + + /// Returns the URL scheme + pub fn scheme(&self) -> &str { + self.url.scheme() + } + + /// Returns the path as expected by [`ObjectStore`] + /// + /// In particular for file scheme URLs, this has a leading `/` + /// and describes an absolute path on the local filesystem + /// + /// For other URLs, this also contains the host component + /// and lacks a leading `/` + /// + /// TODO: Handle paths consistently (#2489) + fn prefix(&self) -> &str { + match self.scheme() { + "file" => self.url.path(), + _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath], + } + } + + /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning + /// an iterator of the remaining path segments + pub(crate) fn strip_prefix<'a, 'b: 'a>( + &'a self, + path: &'b str, + ) -> Option + 'a> { + // Ignore empty path segments + let diff = itertools::diff_with( + path.split('/').filter(|s| !s.is_empty()), + self.prefix().split('/').filter(|s| !s.is_empty()), + |a, b| a == b, + ); + + match diff { + // Match with remaining + Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath), + _ => None, + } + } + + /// List all files identified by this [`ListingTableUrl`] for the provided `file_extension` + pub(crate) fn list_all_files<'a>( + &'a self, + store: &'a dyn ObjectStore, + file_extension: &'a str, + ) -> BoxStream<'a, Result> { + futures::stream::once(store.list_file(self.prefix())) + .try_flatten() + .map_err(DataFusionError::IoError) + .try_filter(move |meta| { + let path = meta.path(); + + let extension_match = path.ends_with(file_extension); + let glob_match = match &self.glob { + Some(glob) => match path.strip_prefix(self.url.path()) { + Some(stripped) => glob.matches(stripped), + None => false, + }, + None => true, + }; + + futures::future::ready(extension_match && glob_match) + }) + .boxed() + } +} + +impl std::fmt::Display for ListingTableUrl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.url) + } +} + +const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; + +/// Splits `path` at the first path segment containing a glob expression, returning +/// `None` if no glob expression found. +/// +/// Path delimiters are determined using [`std::path::is_separator`] which +/// permits `/` as a path delimiter even on Windows platforms. +/// +fn split_glob_expression(path: &str) -> Option<(&str, &str)> { + let mut last_separator = 0; + + for (byte_idx, char) in path.char_indices() { + if GLOB_START_CHARS.contains(&char) { + if last_separator == 0 { + return Some((".", path)); + } + return Some(path.split_at(last_separator)); + } + + if std::path::is_separator(char) { + last_separator = byte_idx + char.len_utf8(); + } + } + None +} + +#[cfg(test)] +mod tests { + use crate::datasource::listing::path::split_glob_expression; + + #[tokio::test] + async fn test_split_glob() { + fn test(input: &str, expected: Option<(&str, &str)>) { + assert_eq!( + split_glob_expression(input), + expected, + "testing split_glob_expression with {}", + input + ); + } + + // no glob patterns + test("/", None); + test("/a.txt", None); + test("/a", None); + test("/a/", None); + test("/a/b", None); + test("/a/b/", None); + test("/a/b.txt", None); + test("/a/b/c.txt", None); + // glob patterns, thus we build the longest path (os-specific) + test("*.txt", Some((".", "*.txt"))); + test("/*.txt", Some(("/", "*.txt"))); + test("/a/*b.txt", Some(("/a/", "*b.txt"))); + test("/a/*/b.txt", Some(("/a/", "*/b.txt"))); + test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt"))); + test("/a/b*.txt", Some(("/a/", "b*.txt"))); + test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt"))); + + // https://github.com/apache/arrow-datafusion/issues/2465 + test( + "/a/b/c//alltypes_plain*.parquet", + Some(("/a/b/c//", "alltypes_plain*.parquet")), + ); + } +} diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1dceb8b35e198..6bb9d3c37183d 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -28,7 +28,9 @@ use crate::datasource::{ avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, FileFormat, }, - get_statistics_with_limit, TableProvider, TableType, + get_statistics_with_limit, + listing::ListingTableUrl, + TableProvider, TableType, }; use crate::logical_expr::TableProviderFilterPushDown; use crate::{ @@ -51,7 +53,7 @@ pub struct ListingTableConfig { /// `ObjectStore` that contains the files for the `ListingTable`. pub object_store: Arc, /// Path on the `ObjectStore` for creating `ListingTable`. - pub table_path: String, + pub table_path: ListingTableUrl, /// Optional `SchemaRef` for the to be created `ListingTable`. pub file_schema: Option, /// Optional `ListingOptions` for the to be created `ListingTable`. @@ -60,13 +62,10 @@ pub struct ListingTableConfig { impl ListingTableConfig { /// Creates new `ListingTableConfig`. The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_path`. - pub fn new( - object_store: Arc, - table_path: impl Into, - ) -> Self { + pub fn new(object_store: Arc, table_path: ListingTableUrl) -> Self { Self { object_store, - table_path: table_path.into(), + table_path, file_schema: None, options: None, } @@ -106,18 +105,18 @@ impl ListingTableConfig { /// Infer `ListingOptions` based on `table_path` suffix. pub async fn infer_options(self) -> Result { - let mut files = self.object_store.list_file(&self.table_path).await?; - let file = files + let file = self + .table_path + .list_all_files(self.object_store.as_ref(), "") .next() .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let tokens: Vec<&str> = file.path().split('.').collect(); - let file_type = tokens.last().ok_or_else(|| { + let file_type = file.path().rsplit('.').next().ok_or_else(|| { DataFusionError::Internal("Unable to infer file suffix".into()) })?; - let format = ListingTableConfig::infer_format(*file_type)?; + let format = ListingTableConfig::infer_format(file_type)?; let listing_options = ListingOptions { format, @@ -140,7 +139,7 @@ impl ListingTableConfig { match self.options { Some(options) => { let schema = options - .infer_schema(self.object_store.clone(), self.table_path.as_str()) + .infer_schema(self.object_store.clone(), &self.table_path) .await?; Ok(Self { @@ -213,10 +212,9 @@ impl ListingOptions { pub async fn infer_schema<'a>( &'a self, store: Arc, - path: &'a str, + table_path: &'a ListingTableUrl, ) -> Result { - let extension = &self.file_extension; - let list_stream = store.glob_file_with_suffix(path, extension).await?; + let list_stream = table_path.list_all_files(store.as_ref(), &self.file_extension); let files: Vec<_> = list_stream.try_collect().await?; self.format.infer_schema(&store, &files).await } @@ -226,7 +224,7 @@ impl ListingOptions { /// or file system listing capability to get the list of files. pub struct ListingTable { object_store: Arc, - table_path: String, + table_path: ListingTableUrl, /// File fields only file_schema: SchemaRef, /// File fields + partition columns @@ -276,9 +274,10 @@ impl ListingTable { pub fn object_store(&self) -> &Arc { &self.object_store } + /// Get path ref - pub fn table_path(&self) -> &str { - &self.table_path + pub fn table_path(&self) -> String { + self.table_path.to_string() } /// Get options ref pub fn options(&self) -> &ListingOptions { @@ -369,6 +368,7 @@ impl ListingTable { .await?; // collect the statistics if required by the config + // TODO: Collect statistics and schema in single-pass let object_store = Arc::clone(&self.object_store); let files = file_list.then(move |part_file| { let object_store = object_store.clone(); @@ -436,11 +436,10 @@ mod tests { async fn load_table_stats_by_default() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); + let uri = ListingTableUrl::parse(filename).unwrap(); let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); - let schema = opt - .infer_schema(Arc::new(LocalFileSystem {}), &filename) - .await?; - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename) + let schema = opt.infer_schema(Arc::new(LocalFileSystem {}), &uri).await?; + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), uri) .with_listing_options(opt) .with_schema(schema); let table = ListingTable::try_new(config)?; @@ -464,9 +463,10 @@ mod tests { collect_stat: true, }; + let table_path = ListingTableUrl::parse("file:///table/").unwrap(); let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); - let config = ListingTableConfig::new(store, "table/") + let config = ListingTableConfig::new(store, table_path) .with_listing_options(opt) .with_schema(file_schema); let table = ListingTable::try_new(config)?; @@ -504,7 +504,7 @@ mod tests { "bucket/key-prefix/file3", "bucket/key-prefix/file4", ], - "bucket/key-prefix/", + "file:///bucket/key-prefix/", 12, 5, ) @@ -518,7 +518,7 @@ mod tests { "bucket/key-prefix/file2", "bucket/key-prefix/file3", ], - "bucket/key-prefix/", + "file:///bucket/key-prefix/", 4, 4, ) @@ -533,14 +533,15 @@ mod tests { "bucket/key-prefix/file3", "bucket/key-prefix/file4", ], - "bucket/key-prefix/", + "file:///bucket/key-prefix/", 2, 2, ) .await?; // no files => no groups - assert_list_files_for_scan_grouping(&[], "bucket/key-prefix/", 2, 0).await?; + assert_list_files_for_scan_grouping(&[], "file:///bucket/key-prefix/", 2, 0) + .await?; // files that don't match the prefix assert_list_files_for_scan_grouping( @@ -549,7 +550,7 @@ mod tests { "bucket/key-prefix/file1", "bucket/other-prefix/roguefile", ], - "bucket/key-prefix/", + "file:///bucket/key-prefix/", 10, 2, ) @@ -560,7 +561,8 @@ mod tests { async fn load_table(name: &str) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), filename) + let uri = ListingTableUrl::parse(filename).unwrap(); + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), uri) .infer() .await?; let table = ListingTable::try_new(config)?; @@ -590,7 +592,8 @@ mod tests { let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); - let config = ListingTableConfig::new(mock_store, table_prefix.to_owned()) + let uri = ListingTableUrl::parse(table_prefix).unwrap(); + let config = ListingTableConfig::new(mock_store, uri) .with_listing_options(opt) .with_schema(Arc::new(schema)); diff --git a/datafusion/core/src/datasource/object_store_registry.rs b/datafusion/core/src/datasource/object_store_registry.rs index 70336af026c71..5353e1d792acc 100644 --- a/datafusion/core/src/datasource/object_store_registry.rs +++ b/datafusion/core/src/datasource/object_store_registry.rs @@ -19,6 +19,7 @@ //! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS //! and query data inside these systems. +use crate::datasource::listing::ListingTableUrl; use datafusion_common::{DataFusionError, Result}; use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME}; use datafusion_data_access::object_store::ObjectStore; @@ -83,31 +84,22 @@ impl ObjectStoreRegistry { /// - URI with scheme `file://` or no schema will return the default LocalFS store /// - URI with scheme `s3://` will return the S3 store if it's registered /// Returns a tuple with the store and the self-described uri of the file in that store - pub fn get_by_uri<'a>( - &self, - uri: &'a str, - ) -> Result<(Arc, &'a str)> { - if let Some((scheme, path)) = uri.split_once("://") { - let stores = self.object_stores.read(); - let store = stores - .get(&*scheme.to_lowercase()) - .map(Clone::clone) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "No suitable object store found for {}", - scheme - )) - })?; - Ok((store, path)) - } else { - Ok((Arc::new(LocalFileSystem), uri)) - } + pub fn get_by_uri(&self, uri: &ListingTableUrl) -> Result> { + let stores = self.object_stores.read(); + let store = stores.get(uri.scheme()).map(Clone::clone).ok_or_else(|| { + DataFusionError::Internal(format!( + "No suitable object store found for {}", + uri + )) + })?; + + Ok(store) } } #[cfg(test)] mod tests { - use super::ObjectStoreRegistry; + use super::*; use datafusion_data_access::object_store::local::LocalFileSystem; use std::sync::Arc; @@ -115,24 +107,21 @@ mod tests { fn test_get_by_uri_s3() { let sut = ObjectStoreRegistry::default(); sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {})); - let uri = "s3://bucket/key"; - let (_, path) = sut.get_by_uri(uri).unwrap(); - assert_eq!(path, "bucket/key"); + let uri = ListingTableUrl::parse("s3://bucket/key").unwrap(); + sut.get_by_uri(&uri).unwrap(); } #[test] fn test_get_by_uri_file() { let sut = ObjectStoreRegistry::default(); - let uri = "file:///bucket/key"; - let (_, path) = sut.get_by_uri(uri).unwrap(); - assert_eq!(path, "/bucket/key"); + let uri = ListingTableUrl::parse("file:///bucket/key").unwrap(); + sut.get_by_uri(&uri).unwrap(); } #[test] fn test_get_by_uri_local() { let sut = ObjectStoreRegistry::default(); - let uri = "/bucket/key"; - let (_, path) = sut.get_by_uri(uri).unwrap(); - assert_eq!(path, "/bucket/key"); + let uri = ListingTableUrl::parse("../").unwrap(); + sut.get_by_uri(&uri).unwrap(); } } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 619ac13b1365d..b4091c071da2c 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -56,7 +56,7 @@ use crate::catalog::{ schema::{MemorySchemaProvider, SchemaProvider}, }; use crate::dataframe::DataFrame; -use crate::datasource::listing::ListingTableConfig; +use crate::datasource::listing::{ListingTableConfig, ListingTableUrl}; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_plan::{ @@ -499,26 +499,25 @@ impl SessionContext { /// Creates a DataFrame for reading an Avro data source. pub async fn read_avro( &self, - uri: impl Into, + uri: impl AsRef, options: AvroReadOptions<'_>, ) -> Result> { - let uri: String = uri.into(); - let (object_store, path) = self.runtime_env().object_store(&uri)?; + let uri = ListingTableUrl::parse(uri)?; + let object_store = self.runtime_env().object_store(&uri)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); - let path: String = path.into(); - let resolved_schema = match options.schema { Some(s) => s, None => { listing_options - .infer_schema(Arc::clone(&object_store), &path) + .infer_schema(Arc::clone(&object_store), &uri) .await? } }; - let config = ListingTableConfig::new(object_store, path.clone()) + + let config = ListingTableConfig::new(object_store, uri) .with_listing_options(listing_options) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; @@ -528,26 +527,24 @@ impl SessionContext { /// Creates a DataFrame for reading an Json data source. pub async fn read_json( &mut self, - uri: impl Into, + uri: impl AsRef, options: NdJsonReadOptions<'_>, ) -> Result> { - let uri: String = uri.into(); - let (object_store, path) = self.runtime_env().object_store(&uri)?; + let uri = ListingTableUrl::parse(uri)?; + let object_store = self.runtime_env().object_store(&uri)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); - let path: String = path.into(); - let resolved_schema = match options.schema { Some(s) => s, None => { listing_options - .infer_schema(Arc::clone(&object_store), &path) + .infer_schema(Arc::clone(&object_store), &uri) .await? } }; - let config = ListingTableConfig::new(object_store, path) + let config = ListingTableConfig::new(object_store, uri) .with_listing_options(listing_options) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; @@ -566,52 +563,47 @@ impl SessionContext { /// Creates a DataFrame for reading a CSV data source. pub async fn read_csv( &self, - uri: impl Into, + uri: impl AsRef, options: CsvReadOptions<'_>, ) -> Result> { - let uri: String = uri.into(); - let (object_store, path) = self.runtime_env().object_store(&uri)?; + let uri = ListingTableUrl::parse(uri)?; + let object_store = self.runtime_env().object_store(&uri)?; let target_partitions = self.copied_config().target_partitions; - let path = path.to_string(); let listing_options = options.to_listing_options(target_partitions); let resolved_schema = match options.schema { Some(s) => Arc::new(s.to_owned()), None => { listing_options - .infer_schema(Arc::clone(&object_store), &path) + .infer_schema(Arc::clone(&object_store), &uri) .await? } }; - let config = ListingTableConfig::new(object_store, path.clone()) + let config = ListingTableConfig::new(object_store, uri.clone()) .with_listing_options(listing_options) .with_schema(resolved_schema); - let provider = ListingTable::try_new(config)?; - let plan = - LogicalPlanBuilder::scan(path, provider_as_source(Arc::new(provider)), None)? - .build()?; - Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) + let provider = ListingTable::try_new(config)?; + self.read_table(Arc::new(provider)) } /// Creates a DataFrame for reading a Parquet data source. pub async fn read_parquet( &self, - uri: impl Into, + uri: impl AsRef, options: ParquetReadOptions<'_>, ) -> Result> { - let uri: String = uri.into(); - let (object_store, path) = self.runtime_env().object_store(&uri)?; + let uri = ListingTableUrl::parse(uri)?; + let object_store = self.runtime_env().object_store(&uri)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); - let path: String = path.into(); // with parquet we resolve the schema in all cases let resolved_schema = listing_options - .infer_schema(Arc::clone(&object_store), &path) + .infer_schema(Arc::clone(&object_store), &uri) .await?; - let config = ListingTableConfig::new(object_store, path) + let config = ListingTableConfig::new(object_store, uri) .with_listing_options(listing_options) .with_schema(resolved_schema); @@ -631,23 +623,24 @@ impl SessionContext { /// Registers a table that uses the listing feature of the object store to /// find the files to be processed /// This is async because it might need to resolve the schema. - pub async fn register_listing_table<'a>( - &'a self, - name: &'a str, - uri: &'a str, + pub async fn register_listing_table( + &self, + name: &str, + uri: impl AsRef, options: ListingOptions, provided_schema: Option, ) -> Result<()> { - let (object_store, path) = self.runtime_env().object_store(uri)?; + let table_path = ListingTableUrl::parse(uri)?; + let object_store = self.runtime_env().object_store(&table_path)?; let resolved_schema = match provided_schema { None => { options - .infer_schema(Arc::clone(&object_store), path) + .infer_schema(Arc::clone(&object_store), &table_path) .await? } Some(s) => s, }; - let config = ListingTableConfig::new(object_store, path) + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(options) .with_schema(resolved_schema); let table = ListingTable::try_new(config)?; diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs index 73bbc836ec92d..1f2ea0c834147 100644 --- a/datafusion/core/src/execution/runtime_env.rs +++ b/datafusion/core/src/execution/runtime_env.rs @@ -26,6 +26,7 @@ use crate::{ }, }; +use crate::datasource::listing::ListingTableUrl; use crate::datasource::object_store_registry::ObjectStoreRegistry; use datafusion_common::DataFusionError; use datafusion_data_access::object_store::ObjectStore; @@ -100,10 +101,7 @@ impl RuntimeEnv { } /// Retrieves a `ObjectStore` instance by scheme - pub fn object_store<'a>( - &self, - uri: &'a str, - ) -> Result<(Arc, &'a str)> { + pub fn object_store(&self, uri: &ListingTableUrl) -> Result> { self.object_store_registry .get_by_uri(uri) .map_err(DataFusionError::from) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 600e24fb8f187..e8852fdc142af 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -52,11 +52,11 @@ //! .to_string(); //! //! let expected = vec![ -//! "+---+--------------------------+", -//! "| a | MIN(tests/example.csv.b) |", -//! "+---+--------------------------+", -//! "| 1 | 2 |", -//! "+---+--------------------------+" +//! "+---+----------------+", +//! "| a | MIN(?table?.b) |", +//! "+---+----------------+", +//! "| 1 | 2 |", +//! "+---+----------------+" //! ]; //! //! assert_eq!(pretty_results.trim().lines().collect::>(), expected); diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index feb0bf3226192..7ab4d3a4a2c0f 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -310,11 +310,14 @@ pub fn with_new_children_if_necessary( /// let displayable_plan = displayable(physical_plan.as_ref()); /// let plan_string = format!("{}", displayable_plan.indent()); /// +/// let path = std::env::current_dir().unwrap(); +/// let plan_string = plan_string.replace(path.to_string_lossy().as_ref(), "WORKING_DIR"); +/// /// assert_eq!("ProjectionExec: expr=[a@0 as a]\ /// \n CoalesceBatchesExec: target_batch_size=4096\ /// \n FilterExec: a@0 < 5\ /// \n RepartitionExec: partitioning=RoundRobinBatch(3)\ -/// \n CsvExec: files=[tests/example.csv], has_header=true, limit=None, projection=[a]", +/// \n CsvExec: files=[WORKING_DIR/tests/example.csv], has_header=true, limit=None, projection=[a]", /// plan_string.trim()); /// /// let one_line = format!("{}", displayable_plan.one_line()); diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index cc9c02305a764..95a6f16ed3c64 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -48,7 +48,7 @@ impl TestObjectStore { #[async_trait] impl ObjectStore for TestObjectStore { async fn list_file(&self, prefix: &str) -> Result { - let prefix = prefix.to_owned(); + let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string(); Ok(Box::pin( stream::iter( self.files diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 1a6a5028e0cb0..3ebcd250ef3a1 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -203,7 +203,7 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result Result SchemaRef { mod tests { use super::*; use std::env; + use std::path::Path; #[test] fn test_data_dir() { let udf_env = "get_data_dir"; - let cwd = env::current_dir().unwrap(); - let existing_pb = cwd.join(".."); - let existing = existing_pb.display().to_string(); - let existing_str = existing.as_str(); + let existing_str = ".."; + let existing_pb = Path::new(existing_str).canonicalize().unwrap(); - let non_existing = cwd.join("non-existing-dir").display().to_string(); - let non_existing_str = non_existing.as_str(); + let non_existing_str = "non-existing-dir"; env::set_var(udf_env, non_existing_str); let res = get_data_dir(udf_env, existing_str); diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 4297b12b60473..35eb2ca3e77e5 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -20,6 +20,7 @@ use std::{fs, io, sync::Arc}; use async_trait::async_trait; +use datafusion::datasource::listing::ListingTableUrl; use datafusion::{ assert_batches_sorted_eq, datafusion_data_access::{ @@ -182,7 +183,7 @@ async fn csv_filter_with_file_col() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "mytable", + "file:///mytable", ); let result = ctx @@ -218,7 +219,7 @@ async fn csv_projection_on_partition() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "mytable", + "file:///mytable", ); let result = ctx @@ -255,7 +256,7 @@ async fn csv_grouping_by_partition() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "mytable", + "file:///mytable", ); let result = ctx @@ -419,6 +420,7 @@ fn register_partitioned_aggregate_csv( let mut options = ListingOptions::new(Arc::new(CsvFormat::default())); options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); + let table_path = ListingTableUrl::parse(table_path).unwrap(); let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(options) .with_schema(file_schema); @@ -444,8 +446,12 @@ async fn register_partitioned_alltypes_parquet( options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); options.collect_stat = true; + let table_path = ListingTableUrl::parse(format!("mirror:///{}", table_path)).unwrap(); + let store_path = + ListingTableUrl::parse(format!("mirror:///{}", store_paths[0])).unwrap(); + let file_schema = options - .infer_schema(Arc::clone(&object_store), store_paths[0]) + .infer_schema(Arc::clone(&object_store), &store_path) .await .expect("Parquet schema inference failed"); @@ -487,7 +493,7 @@ impl ObjectStore for MirroringObjectStore { &self, prefix: &str, ) -> datafusion_data_access::Result { - let prefix = prefix.to_owned(); + let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string(); let size = self.file_size; Ok(Box::pin( stream::iter( diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index f72e0f8f8120a..ac91029a763df 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -670,14 +670,12 @@ async fn test_physical_plan_display_indent() { " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c12]", ]; - let data_path = datafusion::test_util::arrow_test_data(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines() // normalize paths - .map(|s| s.replace(&data_path, "ARROW_TEST_DATA")) + .map(normalize_for_explain) .collect::>(); - assert_eq!( expected, actual, "expected:\n{:#?}\nactual:\n\n{:#?}\n", @@ -721,12 +719,11 @@ async fn test_physical_plan_display_indent_multi_children() { " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]", ]; - let data_path = datafusion::test_util::arrow_test_data(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines() // normalize paths - .map(|s| s.replace(&data_path, "ARROW_TEST_DATA")) + .map(normalize_for_explain) .collect::>(); assert_eq!( diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs index 79deaae79ab32..a74076415a49e 100644 --- a/datafusion/core/tests/sql/json.rs +++ b/datafusion/core/tests/sql/json.rs @@ -96,7 +96,7 @@ async fn json_explain() { \n CoalescePartitionsExec\ \n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ - \n JsonExec: limit=None, files=[tests/jsons/2.json]\n", + \n JsonExec: limit=None, files=[WORKING_DIR/tests/jsons/2.json]\n", ], ]; assert_eq!(expected, actual); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 566baefab7f42..5bb69c9bcbb71 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -814,6 +814,9 @@ fn normalize_for_explain(s: &str) -> String { let data_path = datafusion::test_util::arrow_test_data(); let s = s.replace(&data_path, "ARROW_TEST_DATA"); + let path = std::env::current_dir().unwrap(); + let s = s.replace(path.to_string_lossy().as_ref(), "WORKING_DIR"); + // convert things like partitioning=RoundRobinBatch(16) // to partitioning=RoundRobinBatch(NUM_CORES) let needle = format!("RoundRobinBatch({})", num_cpus::get()); @@ -944,7 +947,7 @@ async fn nyc() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv( "tripdata", - "file.csv", + "file:///file.csv", CsvReadOptions::new().schema(&schema), ) .await?; diff --git a/datafusion/data-access/Cargo.toml b/datafusion/data-access/Cargo.toml index 7bf447f04cfee..8a556f1f35d97 100644 --- a/datafusion/data-access/Cargo.toml +++ b/datafusion/data-access/Cargo.toml @@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion" readme = "README.md" authors = ["Apache Arrow "] license = "Apache-2.0" -keywords = [ "arrow", "query", "sql" ] +keywords = ["arrow", "query", "sql"] edition = "2021" rust-version = "1.59" @@ -36,7 +36,6 @@ path = "src/lib.rs" async-trait = "0.1.41" chrono = { version = "0.4", default-features = false, features = ["std"] } futures = "0.3" -glob = "0.3.0" parking_lot = "0.12" tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/datafusion/data-access/src/object_store/local.rs b/datafusion/data-access/src/object_store/local.rs index 604539814f19f..7b0cab7cd00cb 100644 --- a/datafusion/data-access/src/object_store/local.rs +++ b/datafusion/data-access/src/object_store/local.rs @@ -317,30 +317,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn test_globbing() -> Result<()> { - let tmp = tempdir()?; - let a1_path = tmp.path().join("a1.txt"); - let a2_path = tmp.path().join("a2.txt"); - let b1_path = tmp.path().join("b1.txt"); - File::create(&a1_path)?; - File::create(&a2_path)?; - File::create(&b1_path)?; - - let glob = format!("{}/a*.txt", tmp.path().to_str().unwrap()); - let mut all_files = HashSet::new(); - let mut files = LocalFileSystem.glob_file(&glob).await?; - while let Some(file) = files.next().await { - let file = file?; - assert_eq!(file.size(), 0); - all_files.insert(file.path().to_owned()); - } - - assert_eq!(all_files.len(), 2); - assert!(all_files.contains(a1_path.to_str().unwrap())); - assert!(all_files.contains(a2_path.to_str().unwrap())); - - Ok(()) - } } diff --git a/datafusion/data-access/src/object_store/mod.rs b/datafusion/data-access/src/object_store/mod.rs index 93a930a6dd24e..496a5494fa01f 100644 --- a/datafusion/data-access/src/object_store/mod.rs +++ b/datafusion/data-access/src/object_store/mod.rs @@ -21,15 +21,11 @@ pub mod local; use std::fmt::Debug; use std::io::Read; -use std::path; -use std::path::{Component, Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; -use futures::future::ready; -use futures::{AsyncRead, Stream, StreamExt, TryStreamExt}; -use glob::Pattern; +use futures::{AsyncRead, Stream}; use crate::{FileMeta, ListEntry, Result, SizedFile}; @@ -78,54 +74,6 @@ pub trait ObjectStore: Sync + Send + Debug { /// Returns all the files in path `prefix` async fn list_file(&self, prefix: &str) -> Result; - /// Calls `list_file` with a suffix filter - async fn list_file_with_suffix( - &self, - prefix: &str, - suffix: &str, - ) -> Result { - self.glob_file_with_suffix(prefix, suffix).await - } - - /// Returns all the files matching `glob_pattern` - async fn glob_file(&self, glob_pattern: &str) -> Result { - if !contains_glob_start_char(glob_pattern) { - self.list_file(glob_pattern).await - } else { - let normalized_glob_pb = normalize_path(Path::new(glob_pattern)); - let normalized_glob_pattern = - normalized_glob_pb.as_os_str().to_str().unwrap(); - let start_path = - find_longest_search_path_without_glob_pattern(normalized_glob_pattern); - let file_stream = self.list_file(&start_path).await?; - let pattern = Pattern::new(normalized_glob_pattern).unwrap(); - Ok(Box::pin(file_stream.filter(move |fr| { - let matches_pattern = match fr { - Ok(f) => pattern.matches(f.path()), - Err(_) => true, - }; - async move { matches_pattern } - }))) - } - } - - /// Calls `glob_file` with a suffix filter - async fn glob_file_with_suffix( - &self, - glob_pattern: &str, - suffix: &str, - ) -> Result { - let files_to_consider = match contains_glob_start_char(glob_pattern) { - true => self.glob_file(glob_pattern).await, - false => self.list_file(glob_pattern).await, - }?; - - match suffix.is_empty() { - true => Ok(files_to_consider), - false => filter_suffix(files_to_consider, suffix), - } - } - /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. async fn list_dir( @@ -137,144 +85,3 @@ pub trait ObjectStore: Sync + Send + Debug { /// Get object reader for one file fn file_reader(&self, file: SizedFile) -> Result>; } - -/// Normalize a path without requiring it to exist on the filesystem (path::canonicalize) -pub fn normalize_path>(path: P) -> PathBuf { - let ends_with_slash = path - .as_ref() - .to_str() - .map_or(false, |s| s.ends_with(path::MAIN_SEPARATOR)); - let mut normalized = PathBuf::new(); - for component in path.as_ref().components() { - match &component { - Component::ParentDir => { - if !normalized.pop() { - normalized.push(component); - } - } - _ => { - normalized.push(component); - } - } - } - if ends_with_slash { - normalized.push(""); - } - normalized -} - -const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; - -/// Determine whether the path contains a globbing character -fn contains_glob_start_char(path: &str) -> bool { - path.chars().any(|c| GLOB_START_CHARS.contains(&c)) -} - -/// Filters the file_stream to only contain files that end with suffix -fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> Result { - let suffix = suffix.to_owned(); - Ok(Box::pin( - file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))), - )) -} - -fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { - // in case the glob_pattern is not actually a glob pattern, take the entire thing - if !contains_glob_start_char(glob_pattern) { - glob_pattern.to_string() - } else { - // take all the components of the path (left-to-right) which do not contain a glob pattern - let components_in_glob_pattern = Path::new(glob_pattern).components(); - let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new(); - for component_in_glob_pattern in components_in_glob_pattern { - let component_as_str = - component_in_glob_pattern.as_os_str().to_str().unwrap(); - if contains_glob_start_char(component_as_str) { - break; - } - path_buf_for_longest_search_path_without_glob_pattern - .push(component_in_glob_pattern); - } - - let mut result = path_buf_for_longest_search_path_without_glob_pattern - .to_str() - .unwrap() - .to_string(); - - // when we're not at the root, append a separator - if path_buf_for_longest_search_path_without_glob_pattern - .components() - .count() - > 1 - { - result.push(path::MAIN_SEPARATOR); - } - result - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_is_glob_path() -> Result<()> { - assert!(!contains_glob_start_char("/")); - assert!(!contains_glob_start_char("/test")); - assert!(!contains_glob_start_char("/test/")); - assert!(contains_glob_start_char("/test*")); - Ok(()) - } - - fn test_longest_base_path(input: &str, expected: &str) { - assert_eq!( - find_longest_search_path_without_glob_pattern(input), - expected, - "testing find_longest_search_path_without_glob_pattern with {}", - input - ); - } - - #[tokio::test] - async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> { - // no glob patterns, thus we get the full path (as-is) - test_longest_base_path("/", "/"); - test_longest_base_path("/a.txt", "/a.txt"); - test_longest_base_path("/a", "/a"); - test_longest_base_path("/a/", "/a/"); - test_longest_base_path("/a/b", "/a/b"); - test_longest_base_path("/a/b/", "/a/b/"); - test_longest_base_path("/a/b.txt", "/a/b.txt"); - test_longest_base_path("/a/b/c.txt", "/a/b/c.txt"); - // glob patterns, thus we build the longest path (os-specific) - use path::MAIN_SEPARATOR; - test_longest_base_path("/*.txt", &format!("{MAIN_SEPARATOR}")); - test_longest_base_path( - "/a/*b.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - "/a/*/b.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - "/a/b/[123]/file*.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - "/a/b*.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - "/a/b/**/c*.txt", - &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"), - ); - test_longest_base_path( - &format!("{}/alltypes_plain*.parquet", "/a/b/c//"), // https://github.com/apache/arrow-datafusion/issues/2465 - &format!( - "{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}c{MAIN_SEPARATOR}" - ), - ); - Ok(()) - } -} From 18ceed5656f87e02f3ebddc9c8dbdc9e09816efe Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 May 2022 12:25:37 +0100 Subject: [PATCH 02/13] Fix explain test CPU count sensitivity --- datafusion/core/tests/sql/explain_analyze.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index ac91029a763df..dfa5618fe31e9 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -642,7 +642,7 @@ order by #[tokio::test] async fn test_physical_plan_display_indent() { // Hard code target_partitions as it appears in the RepartitionExec output - let config = SessionConfig::new().with_target_partitions(3); + let config = SessionConfig::new().with_target_partitions(9000); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await.unwrap(); let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \ @@ -662,11 +662,11 @@ async fn test_physical_plan_display_indent() { " ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]", " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 3)", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)", " AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", " CoalesceBatchesExec: target_batch_size=4096", " FilterExec: c12@1 < CAST(10 AS Float64)", - " RepartitionExec: partitioning=RoundRobinBatch(3)", + " RepartitionExec: partitioning=RoundRobinBatch(9000)", " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c12]", ]; @@ -686,7 +686,7 @@ async fn test_physical_plan_display_indent() { #[tokio::test] async fn test_physical_plan_display_indent_multi_children() { // Hard code target_partitions as it appears in the RepartitionExec output - let config = SessionConfig::new().with_target_partitions(3); + let config = SessionConfig::new().with_target_partitions(9000); let ctx = SessionContext::with_config(config); // ensure indenting works for nodes with multiple children register_aggregate_csv(&ctx).await.unwrap(); @@ -706,16 +706,16 @@ async fn test_physical_plan_display_indent_multi_children() { " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 0 })]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 3)", + " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)", " ProjectionExec: expr=[c1@0 as c1]", " ProjectionExec: expr=[c1@0 as c1]", - " RepartitionExec: partitioning=RoundRobinBatch(3)", + " RepartitionExec: partitioning=RoundRobinBatch(9000)", " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 3)", + " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000)", " ProjectionExec: expr=[c2@0 as c2]", " ProjectionExec: expr=[c1@0 as c2]", - " RepartitionExec: partitioning=RoundRobinBatch(3)", + " RepartitionExec: partitioning=RoundRobinBatch(9000)", " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]", ]; From 9c8b5465e24ab117f335d809911f73bab183f94a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 May 2022 13:52:52 +0100 Subject: [PATCH 03/13] Windows compatibility --- .../core/src/datasource/listing/path.rs | 95 +++++++++++++------ datafusion/core/src/test_util.rs | 14 +-- datafusion/core/tests/sql/mod.rs | 59 +++++++++--- 3 files changed, 120 insertions(+), 48 deletions(-) diff --git a/datafusion/core/src/datasource/listing/path.rs b/datafusion/core/src/datasource/listing/path.rs index dcb179d52f171..41a3ec08746f9 100644 --- a/datafusion/core/src/datasource/listing/path.rs +++ b/datafusion/core/src/datasource/listing/path.rs @@ -21,6 +21,8 @@ use datafusion_data_access::FileMeta; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use glob::Pattern; +use std::borrow::Cow; +use std::path::{is_separator, MAIN_SEPARATOR}; use url::Url; /// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`] @@ -99,17 +101,25 @@ impl ListingTableUrl { /// Returns the path as expected by [`ObjectStore`] /// - /// In particular for file scheme URLs, this has a leading `/` - /// and describes an absolute path on the local filesystem + /// In particular for file scheme URLs, this is an absolute + /// on the local filesystem in the OS-specific path representation /// - /// For other URLs, this also contains the host component - /// and lacks a leading `/` + /// For other URLs, this is a the host and path of the URL, + /// delimited by `/`, and with no leading `/` /// /// TODO: Handle paths consistently (#2489) - fn prefix(&self) -> &str { + fn prefix(&self) -> Cow<'_, str> { match self.scheme() { - "file" => self.url.path(), - _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath], + "file" => match MAIN_SEPARATOR { + '/' => Cow::Borrowed(self.url.path()), + _ => { + let path = self.url.to_file_path().unwrap(); + Cow::Owned(path.to_string_lossy().to_string()) + } + }, + _ => Cow::Borrowed( + &self.url[url::Position::BeforeHost..url::Position::AfterPath], + ), } } @@ -119,10 +129,12 @@ impl ListingTableUrl { &'a self, path: &'b str, ) -> Option + 'a> { + let prefix = self.prefix(); // Ignore empty path segments let diff = itertools::diff_with( - path.split('/').filter(|s| !s.is_empty()), - self.prefix().split('/').filter(|s| !s.is_empty()), + // TODO: Handle paths consistently (#2489) + path.split(is_separator).filter(|s| !s.is_empty()), + prefix.split(is_separator).filter(|s| !s.is_empty()), |a, b| a == b, ); @@ -139,24 +151,27 @@ impl ListingTableUrl { store: &'a dyn ObjectStore, file_extension: &'a str, ) -> BoxStream<'a, Result> { - futures::stream::once(store.list_file(self.prefix())) - .try_flatten() - .map_err(DataFusionError::IoError) - .try_filter(move |meta| { - let path = meta.path(); - - let extension_match = path.ends_with(file_extension); - let glob_match = match &self.glob { - Some(glob) => match path.strip_prefix(self.url.path()) { - Some(stripped) => glob.matches(stripped), - None => false, - }, - None => true, - }; + futures::stream::once(async move { + let prefix = self.prefix(); + store.list_file(prefix.as_ref()).await + }) + .try_flatten() + .map_err(DataFusionError::IoError) + .try_filter(move |meta| { + let path = meta.path(); + + let extension_match = path.ends_with(file_extension); + let glob_match = match &self.glob { + Some(glob) => match path.strip_prefix(self.url.path()) { + Some(stripped) => glob.matches(stripped), + None => false, + }, + None => true, + }; - futures::future::ready(extension_match && glob_match) - }) - .boxed() + futures::future::ready(extension_match && glob_match) + }) + .boxed() } } @@ -194,10 +209,32 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> { #[cfg(test)] mod tests { - use crate::datasource::listing::path::split_glob_expression; + use super::*; + use std::path::Path; + + #[test] + fn test_prefix_path() { + let parent = Path::new("../").canonicalize().unwrap(); + let url = ListingTableUrl::parse(parent.to_string_lossy()).unwrap(); + + let path = Path::new(".").canonicalize().unwrap(); + let path = path.to_string_lossy(); + + assert_eq!(url.strip_prefix(path.as_ref()).unwrap().count(), 1); + } + + #[test] + fn test_prefix_s3() { + let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap(); + assert_eq!(url.prefix(), "bucket/foo/bar"); + + let path = "bucket/foo/bar/partition/foo.parquet"; + let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect(); + assert_eq!(prefix, vec!["partition", "foo.parquet"]); + } - #[tokio::test] - async fn test_split_glob() { + #[test] + fn test_split_glob() { fn test(input: &str, expected: Option<(&str, &str)>) { assert_eq!( split_glob_expression(input), diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 3ebcd250ef3a1..1a6a5028e0cb0 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -203,7 +203,7 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result Result SchemaRef { mod tests { use super::*; use std::env; - use std::path::Path; #[test] fn test_data_dir() { let udf_env = "get_data_dir"; + let cwd = env::current_dir().unwrap(); - let existing_str = ".."; - let existing_pb = Path::new(existing_str).canonicalize().unwrap(); + let existing_pb = cwd.join(".."); + let existing = existing_pb.display().to_string(); + let existing_str = existing.as_str(); - let non_existing_str = "non-existing-dir"; + let non_existing = cwd.join("non-existing-dir").display().to_string(); + let non_existing_str = non_existing.as_str(); env::set_var(udf_env, non_existing_str); let res = get_data_dir(udf_env, existing_str); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 20fb593ef8728..5fefdc986e8f7 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -48,7 +48,9 @@ use datafusion::{execution::context::SessionContext, physical_plan::displayable} use datafusion_expr::Volatility; use std::fs::File; use std::io::Write; +use std::path::PathBuf; use tempfile::TempDir; +use url::Url; /// A macro to assert that some particular line contains two substrings /// @@ -811,28 +813,59 @@ pub fn table_with_sequence( Ok(Arc::new(MemTable::try_new(schema, partitions)?)) } -// Normalizes parts of an explain plan that vary from run to run (such as path) -fn normalize_for_explain(s: &str) -> String { - // Convert things like /Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv - // to ARROW_TEST_DATA/csv/aggregate_test_100.csv - let data_path = datafusion::test_util::arrow_test_data(); - let s = s.replace(&data_path, "ARROW_TEST_DATA"); +pub struct ExplainNormalizer { + replacements: Vec<(String, String)>, +} + +impl ExplainNormalizer { + fn new() -> Self { + let mut replacements = vec![]; + + let mut push_path = |path: PathBuf, key: &str| { + // Push path as is + replacements.push((path.to_string_lossy().to_string(), key.to_string())); + + // Push canonical version of path + let canonical = path.canonicalize().unwrap(); + replacements.push((canonical.to_string_lossy().to_string(), key.to_string())); + + // Push URL representation of path, to handle windows + let url = Url::from_directory_path(canonical).unwrap(); + replacements.push((url.path().to_string(), key.to_string())); + }; + + push_path(test_util::arrow_test_data().into(), "ARROW_TEST_DATA"); + push_path(std::env::current_dir().unwrap(), "WORKING_DIR"); - let path = std::env::current_dir().unwrap(); - let s = s.replace(path.to_string_lossy().as_ref(), "WORKING_DIR"); + // convert things like partitioning=RoundRobinBatch(16) + // to partitioning=RoundRobinBatch(NUM_CORES) + let needle = format!("RoundRobinBatch({})", num_cpus::get()); + replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string())); - // convert things like partitioning=RoundRobinBatch(16) - // to partitioning=RoundRobinBatch(NUM_CORES) - let needle = format!("RoundRobinBatch({})", num_cpus::get()); - s.replace(&needle, "RoundRobinBatch(NUM_CORES)") + Self { replacements } + } + + fn normalize(&self, s: impl Into) -> String { + let mut s = s.into(); + for (from, to) in &self.replacements { + s = s.replace(from, to); + } + s + } +} + +// Normalizes parts of an explain plan that vary from run to run (such as path) +fn normalize_for_explain(s: &str) -> String { + ExplainNormalizer::new().normalize(s) } /// Applies normalize_for_explain to every line fn normalize_vec_for_explain(v: Vec>) -> Vec> { + let normalizer = ExplainNormalizer::new(); v.into_iter() .map(|l| { l.into_iter() - .map(|s| normalize_for_explain(&s)) + .map(|s| normalizer.normalize(s)) .collect::>() }) .collect::>() From 230e42baa54f54769633fd51a1f3e4dfcf152968 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 May 2022 16:47:54 +0100 Subject: [PATCH 04/13] More windows pacification --- .../core/src/datasource/listing/path.rs | 65 ++++++++++++------- datafusion/core/tests/sql/explain_analyze.rs | 6 +- datafusion/core/tests/sql/mod.rs | 5 -- 3 files changed, 44 insertions(+), 32 deletions(-) diff --git a/datafusion/core/src/datasource/listing/path.rs b/datafusion/core/src/datasource/listing/path.rs index 41a3ec08746f9..510d0bbeb41ff 100644 --- a/datafusion/core/src/datasource/listing/path.rs +++ b/datafusion/core/src/datasource/listing/path.rs @@ -21,6 +21,7 @@ use datafusion_data_access::FileMeta; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use glob::Pattern; +use itertools::Itertools; use std::borrow::Cow; use std::path::{is_separator, MAIN_SEPARATOR}; use url::Url; @@ -40,9 +41,10 @@ impl ListingTableUrl { /// /// # Paths without a Scheme /// - /// If no scheme is provided, the string will be interpreted as a - /// path on the local filesystem, using the operating system's - /// standard path delimiter - i.e. `\` on Windows, `/` on Unix. + /// If no scheme is provided, or the string is an absolute filesystem path + /// as determined [`std::path::Path::is_absolute`], the string will be + /// interpreted as a path on the local filesystem using the operating + /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix. /// /// If the path contains any of `'?', '*', '['`, it will be considered /// a glob expression and resolved as described in the section below. @@ -70,28 +72,37 @@ impl ListingTableUrl { /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme pub fn parse(s: impl AsRef) -> Result { let s = s.as_ref(); - Ok(match Url::parse(s) { - Ok(url) => Self { url, glob: None }, - Err(url::ParseError::RelativeUrlWithoutBase) => { - let (prefix, glob) = match split_glob_expression(s) { - Some((prefix, glob)) => { - let glob = Pattern::new(glob) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - (prefix, Some(glob)) - } - None => (s, None), - }; - let path = std::path::Path::new(prefix).canonicalize()?; - let url = match path.is_file() { - true => Url::from_file_path(path).unwrap(), - false => Url::from_directory_path(path).unwrap(), - }; + // This is necessary to handle the case of a path starting with a drive letter + if std::path::Path::new(s).is_absolute() { + return Self::parse_path(s); + } + + match Url::parse(s) { + Ok(url) => Ok(Self { url, glob: None }), + Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s), + Err(e) => Err(DataFusionError::External(Box::new(e))), + } + } - Self { url, glob } + /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path + fn parse_path(s: &str) -> Result { + let (prefix, glob) = match split_glob_expression(s) { + Some((prefix, glob)) => { + let glob = Pattern::new(glob) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + (prefix, Some(glob)) } - Err(e) => return Err(DataFusionError::External(Box::new(e))), - }) + None => (s, None), + }; + + let path = std::path::Path::new(prefix).canonicalize()?; + let url = match path.is_file() { + true => Url::from_file_path(path).unwrap(), + false => Url::from_directory_path(path).unwrap(), + }; + + Ok(Self { url, glob }) } /// Returns the URL scheme @@ -125,6 +136,8 @@ impl ListingTableUrl { /// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning /// an iterator of the remaining path segments + /// + /// TODO: Handle paths consistently (#2489) pub(crate) fn strip_prefix<'a, 'b: 'a>( &'a self, path: &'b str, @@ -132,7 +145,6 @@ impl ListingTableUrl { let prefix = self.prefix(); // Ignore empty path segments let diff = itertools::diff_with( - // TODO: Handle paths consistently (#2489) path.split(is_separator).filter(|s| !s.is_empty()), prefix.split(is_separator).filter(|s| !s.is_empty()), |a, b| a == b, @@ -162,8 +174,11 @@ impl ListingTableUrl { let extension_match = path.ends_with(file_extension); let glob_match = match &self.glob { - Some(glob) => match path.strip_prefix(self.url.path()) { - Some(stripped) => glob.matches(stripped), + Some(glob) => match self.strip_prefix(path) { + Some(mut segments) => { + let stripped = segments.join("/"); + glob.matches(&stripped) + } None => false, }, None => true, diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index dfa5618fe31e9..759edf6fc55bc 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -670,11 +670,12 @@ async fn test_physical_plan_display_indent() { " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c12]", ]; + let normalizer = ExplainNormalizer::new(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines() // normalize paths - .map(normalize_for_explain) + .map(|s| normalizer.normalize(s)) .collect::>(); assert_eq!( expected, actual, @@ -719,11 +720,12 @@ async fn test_physical_plan_display_indent_multi_children() { " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]", ]; + let normalizer = ExplainNormalizer::new(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines() // normalize paths - .map(normalize_for_explain) + .map(|s| normalizer.normalize(s)) .collect::>(); assert_eq!( diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 5fefdc986e8f7..8ce4ce617191f 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -854,11 +854,6 @@ impl ExplainNormalizer { } } -// Normalizes parts of an explain plan that vary from run to run (such as path) -fn normalize_for_explain(s: &str) -> String { - ExplainNormalizer::new().normalize(s) -} - /// Applies normalize_for_explain to every line fn normalize_vec_for_explain(v: Vec>) -> Vec> { let normalizer = ExplainNormalizer::new(); From deef0b6cd09c8fe521e70a3dc721084964808f3f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 29 May 2022 12:59:27 +0100 Subject: [PATCH 05/13] Even more windows pacification --- .../core/src/datasource/listing/path.rs | 30 ++++++++----------- datafusion/core/tests/sql/mod.rs | 9 ++++-- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/datasource/listing/path.rs b/datafusion/core/src/datasource/listing/path.rs index 510d0bbeb41ff..8d3f1a032a1bc 100644 --- a/datafusion/core/src/datasource/listing/path.rs +++ b/datafusion/core/src/datasource/listing/path.rs @@ -22,8 +22,7 @@ use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use glob::Pattern; use itertools::Itertools; -use std::borrow::Cow; -use std::path::{is_separator, MAIN_SEPARATOR}; +use std::path::is_separator; use url::Url; /// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`] @@ -119,18 +118,13 @@ impl ListingTableUrl { /// delimited by `/`, and with no leading `/` /// /// TODO: Handle paths consistently (#2489) - fn prefix(&self) -> Cow<'_, str> { + fn prefix(&self) -> &str { match self.scheme() { - "file" => match MAIN_SEPARATOR { - '/' => Cow::Borrowed(self.url.path()), - _ => { - let path = self.url.to_file_path().unwrap(); - Cow::Owned(path.to_string_lossy().to_string()) - } + "file" => match cfg!(target_family = "windows") { + true => self.url.path().strip_prefix('/').unwrap(), + false => self.url.path(), }, - _ => Cow::Borrowed( - &self.url[url::Position::BeforeHost..url::Position::AfterPath], - ), + _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath], } } @@ -225,17 +219,17 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> { #[cfg(test)] mod tests { use super::*; - use std::path::Path; #[test] fn test_prefix_path() { - let parent = Path::new("../").canonicalize().unwrap(); - let url = ListingTableUrl::parse(parent.to_string_lossy()).unwrap(); + let root = std::env::current_dir().unwrap(); + let root = root.to_string_lossy(); - let path = Path::new(".").canonicalize().unwrap(); - let path = path.to_string_lossy(); + let url = ListingTableUrl::parse(&root).unwrap(); + let child = format!("{}/partition/file", root); - assert_eq!(url.strip_prefix(path.as_ref()).unwrap().count(), 1); + let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect(); + assert_eq!(prefix, vec!["partition", "file"]); } #[test] diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 8ce4ce617191f..3e19dbcb990b2 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -829,9 +829,12 @@ impl ExplainNormalizer { let canonical = path.canonicalize().unwrap(); replacements.push((canonical.to_string_lossy().to_string(), key.to_string())); - // Push URL representation of path, to handle windows - let url = Url::from_directory_path(canonical).unwrap(); - replacements.push((url.path().to_string(), key.to_string())); + if cfg!(target_family = "windows") { + // Push URL representation of path, to handle windows + let url = Url::from_file_path(canonical).unwrap(); + let path = url.path().strip_prefix('/').unwrap(); + replacements.push((path.to_string(), key.to_string())); + } }; push_path(test_util::arrow_test_data().into(), "ARROW_TEST_DATA"); From 6c311bab297bb17faf0205198539859735ea3c80 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 29 May 2022 17:24:03 +0100 Subject: [PATCH 06/13] Fix windows doctest --- datafusion/core/src/physical_plan/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 7ab4d3a4a2c0f..2a89ea0df92d6 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -288,6 +288,7 @@ pub fn with_new_children_if_necessary( /// ``` /// use datafusion::prelude::*; /// use datafusion::physical_plan::displayable; +/// use std::path::is_separator; /// /// #[tokio::main] /// async fn main() { @@ -310,8 +311,9 @@ pub fn with_new_children_if_necessary( /// let displayable_plan = displayable(physical_plan.as_ref()); /// let plan_string = format!("{}", displayable_plan.indent()); /// -/// let path = std::env::current_dir().unwrap(); -/// let plan_string = plan_string.replace(path.to_string_lossy().as_ref(), "WORKING_DIR"); +/// let working_directory = std::env::current_dir().unwrap(); +/// let normalized = working_directory.to_string_lossy().replace(is_separator, "/"); +/// let plan_string = plan_string.replace(&normalized, "WORKING_DIR"); /// /// assert_eq!("ProjectionExec: expr=[a@0 as a]\ /// \n CoalesceBatchesExec: target_batch_size=4096\ From 63df81c841bbe06e30f1bcc6cba03632d5823bcc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 29 May 2022 22:52:06 +0100 Subject: [PATCH 07/13] Add ObjectStoreUrl --- datafusion/core/src/catalog/schema.rs | 4 +- .../core/src/datasource/file_format/mod.rs | 2 + .../core/src/datasource/listing/path.rs | 26 ++++- .../core/src/datasource/listing/table.rs | 1 + datafusion/core/src/datasource/mod.rs | 2 +- ...ject_store_registry.rs => object_store.rs} | 104 +++++++++++++++--- datafusion/core/src/execution/runtime_env.rs | 8 +- .../src/physical_optimizer/repartition.rs | 2 + .../src/physical_plan/file_format/avro.rs | 18 ++- .../src/physical_plan/file_format/json.rs | 25 ++++- .../core/src/physical_plan/file_format/mod.rs | 7 +- .../src/physical_plan/file_format/parquet.rs | 5 + datafusion/core/src/test/mod.rs | 2 + datafusion/core/tests/row.rs | 11 +- 14 files changed, 179 insertions(+), 38 deletions(-) rename datafusion/core/src/datasource/{object_store_registry.rs => object_store.rs} (57%) diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index be690470fe9e9..cd823d1545d29 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -24,7 +24,7 @@ use std::collections::HashMap; use std::sync::Arc; use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; -use crate::datasource::object_store_registry::ObjectStoreRegistry; +use crate::datasource::object_store::ObjectStoreRegistry; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use datafusion_data_access::object_store::ObjectStore; @@ -160,7 +160,7 @@ impl ObjectStoreSchemaProvider { pub fn object_store(&self, uri: &ListingTableUrl) -> Result> { self.object_store_registry .lock() - .get_by_uri(uri) + .get_by_url(uri) .map_err(DataFusionError::from) } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 669ed0efdde28..eae86fa9cca74 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -85,6 +85,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { pub(crate) mod test_util { use super::*; use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use datafusion_data_access::object_store::local::{ local_unpartitioned_file, LocalFileSystem, }; @@ -115,6 +116,7 @@ pub(crate) mod test_util { .create_physical_plan( FileScanConfig { object_store: store, + object_store_url: ObjectStoreUrl::local_filesystem(), file_schema, file_groups, statistics, diff --git a/datafusion/core/src/datasource/listing/path.rs b/datafusion/core/src/datasource/listing/path.rs index 8d3f1a032a1bc..2b0e3344a3c21 100644 --- a/datafusion/core/src/datasource/listing/path.rs +++ b/datafusion/core/src/datasource/listing/path.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::datasource::object_store::ObjectStoreUrl; use datafusion_common::{DataFusionError, Result}; use datafusion_data_access::object_store::ObjectStore; use datafusion_data_access::FileMeta; @@ -182,11 +183,34 @@ impl ListingTableUrl { }) .boxed() } + + /// Returns this [`ListingTableUrl`] as a string + pub fn as_str(&self) -> &str { + self.as_ref() + } + + /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`] + pub fn object_store(&self) -> ObjectStoreUrl { + let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath]; + ObjectStoreUrl::parse(url).unwrap() + } +} + +impl AsRef for ListingTableUrl { + fn as_ref(&self) -> &str { + self.url.as_ref() + } +} + +impl AsRef for ListingTableUrl { + fn as_ref(&self) -> &Url { + &self.url + } } impl std::fmt::Display for ListingTableUrl { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.url) + self.as_str().fmt(f) } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 94db3350341cf..3b0f4340b4884 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -322,6 +322,7 @@ impl TableProvider for ListingTable { .create_physical_plan( FileScanConfig { object_store: Arc::clone(&self.object_store), + object_store_url: self.table_path.object_store(), file_schema: Arc::clone(&self.file_schema), file_groups: partitioned_file_lists, statistics, diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index f3cc0a04eecdb..65fc2adcbaa07 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -23,7 +23,7 @@ pub mod empty; pub mod file_format; pub mod listing; pub mod memory; -pub mod object_store_registry; +pub mod object_store; pub mod view; use futures::Stream; diff --git a/datafusion/core/src/datasource/object_store_registry.rs b/datafusion/core/src/datasource/object_store.rs similarity index 57% rename from datafusion/core/src/datasource/object_store_registry.rs rename to datafusion/core/src/datasource/object_store.rs index 5353e1d792acc..6944a9a8c50b1 100644 --- a/datafusion/core/src/datasource/object_store_registry.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -19,14 +19,67 @@ //! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS //! and query data inside these systems. -use crate::datasource::listing::ListingTableUrl; use datafusion_common::{DataFusionError, Result}; use datafusion_data_access::object_store::local::{LocalFileSystem, LOCAL_SCHEME}; use datafusion_data_access::object_store::ObjectStore; use parking_lot::RwLock; use std::collections::HashMap; -use std::fmt; use std::sync::Arc; +use url::Url; + +/// A parsed URL identifying a particular [`ObjectStore`] +#[derive(Debug, Clone)] +pub struct ObjectStoreUrl { + url: Url, +} + +impl ObjectStoreUrl { + /// Parse an [`ObjectStoreUrl`] from a string + pub fn parse(s: impl AsRef) -> Result { + let mut parsed = + Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?; + + let remaining = &parsed[url::Position::BeforePath..]; + if remaining != "" && remaining != "/" { + return Err(DataFusionError::Execution(format!( + "ObjectStoreUrl must only contain scheme and authority, got: {}", + remaining + ))); + } + + // Always set path for consistency + parsed.set_path("/"); + Ok(Self { url: parsed }) + } + + /// An [`ObjectStoreUrl`] for the local filesystem + pub fn local_filesystem() -> Self { + Self::parse("file://").unwrap() + } + + /// Returns this [`ObjectStoreUrl`] as a string + pub fn as_str(&self) -> &str { + self.as_ref() + } +} + +impl AsRef for ObjectStoreUrl { + fn as_ref(&self) -> &str { + self.url.as_ref() + } +} + +impl AsRef for ObjectStoreUrl { + fn as_ref(&self) -> &Url { + &self.url + } +} + +impl std::fmt::Display for ObjectStoreUrl { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.as_str().fmt(f) + } +} /// Object store registry pub struct ObjectStoreRegistry { @@ -34,8 +87,8 @@ pub struct ObjectStoreRegistry { pub object_stores: RwLock>>, } -impl fmt::Debug for ObjectStoreRegistry { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { +impl std::fmt::Debug for ObjectStoreRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("ObjectStoreRegistry") .field( "schemes", @@ -84,12 +137,13 @@ impl ObjectStoreRegistry { /// - URI with scheme `file://` or no schema will return the default LocalFS store /// - URI with scheme `s3://` will return the S3 store if it's registered /// Returns a tuple with the store and the self-described uri of the file in that store - pub fn get_by_uri(&self, uri: &ListingTableUrl) -> Result> { + pub fn get_by_url(&self, url: impl AsRef) -> Result> { + let url = url.as_ref(); let stores = self.object_stores.read(); - let store = stores.get(uri.scheme()).map(Clone::clone).ok_or_else(|| { + let store = stores.get(url.scheme()).map(Clone::clone).ok_or_else(|| { DataFusionError::Internal(format!( "No suitable object store found for {}", - uri + url )) })?; @@ -100,28 +154,52 @@ impl ObjectStoreRegistry { #[cfg(test)] mod tests { use super::*; + use crate::datasource::listing::ListingTableUrl; use datafusion_data_access::object_store::local::LocalFileSystem; use std::sync::Arc; #[test] - fn test_get_by_uri_s3() { + fn test_object_store_url() { + let listing = ListingTableUrl::parse("file:///").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "file:///"); + + let file = ObjectStoreUrl::parse("file://").unwrap(); + assert_eq!(file.as_str(), "file:///"); + + let listing = ListingTableUrl::parse("s3://bucket/").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "s3://bucket/"); + + let url = ObjectStoreUrl::parse("s3://bucket").unwrap(); + assert_eq!(url.as_str(), "s3://bucket/"); + + let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err(); + assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?"); + + let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err(); + assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?foo=bar"); + } + + #[test] + fn test_get_by_url_s3() { let sut = ObjectStoreRegistry::default(); sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {})); let uri = ListingTableUrl::parse("s3://bucket/key").unwrap(); - sut.get_by_uri(&uri).unwrap(); + sut.get_by_url(&uri).unwrap(); } #[test] - fn test_get_by_uri_file() { + fn test_get_by_url_file() { let sut = ObjectStoreRegistry::default(); let uri = ListingTableUrl::parse("file:///bucket/key").unwrap(); - sut.get_by_uri(&uri).unwrap(); + sut.get_by_url(&uri).unwrap(); } #[test] - fn test_get_by_uri_local() { + fn test_get_by_url_local() { let sut = ObjectStoreRegistry::default(); let uri = ListingTableUrl::parse("../").unwrap(); - sut.get_by_uri(&uri).unwrap(); + sut.get_by_url(&uri).unwrap(); } } diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs index 1f2ea0c834147..26d1471a1df5d 100644 --- a/datafusion/core/src/execution/runtime_env.rs +++ b/datafusion/core/src/execution/runtime_env.rs @@ -26,13 +26,13 @@ use crate::{ }, }; -use crate::datasource::listing::ListingTableUrl; -use crate::datasource::object_store_registry::ObjectStoreRegistry; +use crate::datasource::object_store::ObjectStoreRegistry; use datafusion_common::DataFusionError; use datafusion_data_access::object_store::ObjectStore; use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::sync::Arc; +use url::Url; #[derive(Clone)] /// Execution runtime environment. @@ -101,9 +101,9 @@ impl RuntimeEnv { } /// Retrieves a `ObjectStore` instance by scheme - pub fn object_store(&self, uri: &ListingTableUrl) -> Result> { + pub fn object_store(&self, url: impl AsRef) -> Result> { self.object_store_registry - .get_by_uri(uri) + .get_by_url(url) .map_err(DataFusionError::from) } } diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 3b2c4515eafde..f24e1e4e6f811 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -241,6 +241,7 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; use crate::physical_plan::expressions::{col, PhysicalSortExpr}; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; @@ -261,6 +262,7 @@ mod tests { Arc::new(ParquetExec::new( FileScanConfig { object_store: TestObjectStore::new_arc(&[("x", 100)]), + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), file_schema: schema(), file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], statistics: Statistics::default(), diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index fc56ce1d87fe1..b761a83062e93 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -167,6 +167,7 @@ impl ExecutionPlan for AvroExec { mod tests { use crate::datasource::file_format::{avro::AvroFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::prelude::SessionContext; use crate::scalar::ScalarValue; use arrow::datatypes::{DataType, Field, Schema}; @@ -188,6 +189,7 @@ mod tests { let avro_exec = AvroExec::new(FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![vec![meta.into()]], file_schema, statistics: Statistics::default(), @@ -241,9 +243,10 @@ mod tests { async fn avro_exec_missing_column() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); - let store = Arc::new(LocalFileSystem {}) as _; + let object_store = Arc::new(LocalFileSystem {}) as _; + let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); - let actual_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?; + let actual_schema = AvroFormat {}.infer_schema(&object_store, &[meta.clone()]).await?; let mut fields = actual_schema.fields().clone(); fields.push(Field::new("missing_col", DataType::Int32, true)); @@ -253,7 +256,8 @@ mod tests { let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]); let avro_exec = AvroExec::new(FileScanConfig { - object_store: store, + object_store, + object_store_url, file_groups: vec![vec![meta.into()]], file_schema, statistics: Statistics::default(), @@ -307,9 +311,10 @@ mod tests { async fn avro_exec_with_partition() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); - let store = Arc::new(LocalFileSystem {}) as _; + let object_store = Arc::new(LocalFileSystem {}) as _; + let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); - let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?; + let file_schema = AvroFormat {}.infer_schema(&object_store, &[meta.clone()]).await?; let mut partitioned_file = PartitionedFile::from(meta); partitioned_file.partition_values = @@ -319,7 +324,8 @@ mod tests { // select specific columns of the files as well as the partitioning // column which is supposed to be the last column in the table schema. projection: Some(vec![0, 1, file_schema.fields().len(), 2]), - object_store: store, + object_store, + object_store_url, file_groups: vec![vec![partitioned_file]], file_schema, statistics: Statistics::default(), diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 5470f6d57fd13..3a179a7a29fb6 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -195,6 +195,7 @@ mod tests { use crate::datafusion_data_access::object_store::local::LocalFileSystem; use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; + use crate::datasource::object_store::ObjectStoreUrl; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; use datafusion_data_access::object_store::local::local_unpartitioned_file; @@ -205,9 +206,14 @@ mod tests { const TEST_DATA_BASE: &str = "tests/jsons"; - async fn prepare_store( - ) -> (Arc, Vec>, SchemaRef) { + async fn prepare_store() -> ( + Arc, + ObjectStoreUrl, + Vec>, + SchemaRef, + ) { let store = Arc::new(LocalFileSystem {}) as _; + let store_url = ObjectStoreUrl::local_filesystem(); let path = format!("{}/1.json", TEST_DATA_BASE); let meta = local_unpartitioned_file(path); let schema = JsonFormat::default() @@ -215,7 +221,7 @@ mod tests { .await .unwrap(); - (store, vec![vec![meta.into()]], schema) + (store, store_url, vec![vec![meta.into()]], schema) } #[tokio::test] @@ -223,10 +229,13 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; - let (object_store, file_groups, file_schema) = prepare_store().await; + + let (object_store, object_store_url, file_groups, file_schema) = + prepare_store().await; let exec = NdJsonExec::new(FileScanConfig { object_store, + object_store_url, file_groups, file_schema, statistics: Statistics::default(), @@ -280,7 +289,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; - let (object_store, file_groups, actual_schema) = prepare_store().await; + let (object_store, object_store_url, file_groups, actual_schema) = + prepare_store().await; let mut fields = actual_schema.fields().clone(); fields.push(Field::new("missing_col", DataType::Int32, true)); @@ -290,6 +300,7 @@ mod tests { let exec = NdJsonExec::new(FileScanConfig { object_store, + object_store_url, file_groups, file_schema, statistics: Statistics::default(), @@ -319,10 +330,12 @@ mod tests { async fn nd_json_exec_file_projection() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let (object_store, file_groups, file_schema) = prepare_store().await; + let (object_store, object_store_url, file_groups, file_schema) = + prepare_store().await; let exec = NdJsonExec::new(FileScanConfig { object_store, + object_store_url, file_groups, file_schema, statistics: Statistics::default(), diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 566b4c8d47ab0..a4a108c3d48d3 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -38,7 +38,9 @@ pub use csv::CsvExec; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; -use crate::datasource::listing::PartitionedFile; +use crate::datasource::{ + listing::PartitionedFile, object_store::ObjectStoreUrl, +}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -68,6 +70,8 @@ lazy_static! { pub struct FileScanConfig { /// Store from which the `files` should be fetched pub object_store: Arc, + /// Object store URL + pub object_store_url: ObjectStoreUrl, /// Schema before projection. It contains the columns that are expected /// to be in the files without the table partition columns. pub file_schema: SchemaRef, @@ -658,6 +662,7 @@ mod tests { file_groups: vec![vec![]], limit: None, object_store: TestObjectStore::new_arc(&[]), + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), projection, statistics, table_partition_cols, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 0931484ef7708..9eda036a42267 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -640,6 +640,7 @@ mod tests { use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; use crate::datasource::listing::FileRange; + use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::options::CsvReadOptions; use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use arrow::array::Float32Array; @@ -681,6 +682,7 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![file_groups], file_schema, statistics: Statistics::default(), @@ -1067,6 +1069,7 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups, file_schema, statistics: Statistics::default(), @@ -1155,6 +1158,7 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { object_store: store, + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![vec![partitioned_file]], file_schema: schema, statistics: Statistics::default(), @@ -1214,6 +1218,7 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![vec![partitioned_file]], file_schema: Arc::new(Schema::empty()), statistics: Statistics::default(), diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 815379de44dba..19a9db9a4ed2e 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -18,6 +18,7 @@ //! Common unit test utility methods use crate::arrow::array::UInt32Array; +use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::{MemTable, TableProvider}; use crate::error::Result; use crate::from_slice::FromSlice; @@ -120,6 +121,7 @@ pub fn partitioned_csv_config( Ok(FileScanConfig { object_store: Arc::new(LocalFileSystem {}), + object_store_url: ObjectStoreUrl::local_filesystem(), file_schema: schema, file_groups, statistics: Default::default(), diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 51ddff2fb3024..947ebc699b48a 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -17,6 +17,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; +use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::error::Result; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{collect, ExecutionPlan}; @@ -81,21 +82,23 @@ async fn get_exec( let meta = local_unpartitioned_file(filename); let format = ParquetFormat::default(); - let store = Arc::new(LocalFileSystem {}) as _; + let object_store = Arc::new(LocalFileSystem {}) as _; + let object_store_url = ObjectStoreUrl::local_filesystem(); let file_schema = format - .infer_schema(&store, &[meta.clone()]) + .infer_schema(&object_store, &[meta.clone()]) .await .expect("Schema inference"); let statistics = format - .infer_stats(&store, file_schema.clone(), &meta) + .infer_stats(&object_store, file_schema.clone(), &meta) .await .expect("Stats inference"); let file_groups = vec![vec![meta.into()]]; let exec = format .create_physical_plan( FileScanConfig { - object_store: store, + object_store, + object_store_url, file_schema, file_groups, statistics, From 938e91611940387c3d62a6fb24b731cc1e8238fc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 May 2022 00:05:04 +0100 Subject: [PATCH 08/13] Update ballista pin --- dev/build-arrow-ballista.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh index 0c39226273102..8c2e687f308fa 100755 --- a/dev/build-arrow-ballista.sh +++ b/dev/build-arrow-ballista.sh @@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null # clone the repo # TODO make repo/branch configurable -git clone https://github.com/apache/arrow-ballista +git clone https://github.com/tustvold/arrow-ballista -b url-refactor # update dependencies to local crates python ./dev/make-ballista-deps-local.py From c1cf2da730281675db9b2e81990fcbf86fd3ae15 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 May 2022 00:10:48 +0100 Subject: [PATCH 09/13] Format --- datafusion/core/src/physical_plan/file_format/avro.rs | 8 ++++++-- datafusion/core/src/physical_plan/file_format/mod.rs | 4 +--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index b761a83062e93..8f4d30be0e174 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -246,7 +246,9 @@ mod tests { let object_store = Arc::new(LocalFileSystem {}) as _; let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); - let actual_schema = AvroFormat {}.infer_schema(&object_store, &[meta.clone()]).await?; + let actual_schema = AvroFormat {} + .infer_schema(&object_store, &[meta.clone()]) + .await?; let mut fields = actual_schema.fields().clone(); fields.push(Field::new("missing_col", DataType::Int32, true)); @@ -314,7 +316,9 @@ mod tests { let object_store = Arc::new(LocalFileSystem {}) as _; let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); - let file_schema = AvroFormat {}.infer_schema(&object_store, &[meta.clone()]).await?; + let file_schema = AvroFormat {} + .infer_schema(&object_store, &[meta.clone()]) + .await?; let mut partitioned_file = PartitionedFile::from(meta); partitioned_file.partition_values = diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index a4a108c3d48d3..0fed497d3c611 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -38,9 +38,7 @@ pub use csv::CsvExec; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; -use crate::datasource::{ - listing::PartitionedFile, object_store::ObjectStoreUrl, -}; +use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, From c32e27ebe749b2b9bc66a65c677fa8ce5892f71b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 May 2022 00:27:36 +0100 Subject: [PATCH 10/13] Clippy --- datafusion/core/src/datasource/object_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs index 6944a9a8c50b1..893f5df1a94af 100644 --- a/datafusion/core/src/datasource/object_store.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -40,7 +40,7 @@ impl ObjectStoreUrl { Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?; let remaining = &parsed[url::Position::BeforePath..]; - if remaining != "" && remaining != "/" { + if !remaining.is_empty() && remaining != "/" { return Err(DataFusionError::Execution(format!( "ObjectStoreUrl must only contain scheme and authority, got: {}", remaining From 61d67cff1d6d3a40fdc40bb8897a7aa8f18ea7dc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 May 2022 10:25:32 +0100 Subject: [PATCH 11/13] Consistent naming --- benchmarks/src/bin/tpch.rs | 4 +- datafusion-examples/examples/flight_server.rs | 5 +- .../core/benches/sort_limit_query_sql.rs | 4 +- datafusion/core/src/catalog/schema.rs | 29 +++++---- .../core/src/datasource/listing/table.rs | 16 ++--- .../core/src/datasource/object_store.rs | 21 +++---- datafusion/core/src/execution/context.rs | 60 +++++++++---------- datafusion/proto/src/logical_plan.rs | 6 +- 8 files changed, 77 insertions(+), 68 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index c7ae7b7e5bf51..c46badd64fa8a 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -426,8 +426,8 @@ fn get_table( table_partition_cols: vec![], }; - let uri = ListingTableUrl::parse(path)?; - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), uri) + let table_path = ListingTableUrl::parse(path)?; + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) .with_listing_options(options) .with_schema(schema); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index d7f07b3cac538..a3d7c0f566443 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -68,10 +68,11 @@ impl FlightService for FlightServiceImpl { let request = request.into_inner(); let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); - let url = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; + let table_path = + ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; let schema = listing_options - .infer_schema(Arc::new(LocalFileSystem {}), &url) + .infer_schema(Arc::new(LocalFileSystem {}), &table_path) .await .unwrap(); diff --git a/datafusion/core/benches/sort_limit_query_sql.rs b/datafusion/core/benches/sort_limit_query_sql.rs index 0b9905ba56d04..d1f253a982a51 100644 --- a/datafusion/core/benches/sort_limit_query_sql.rs +++ b/datafusion/core/benches/sort_limit_query_sql.rs @@ -66,12 +66,12 @@ fn create_context() -> Arc> { let testdata = datafusion::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); - let url = ListingTableUrl::parse(path).unwrap(); + let table_path = ListingTableUrl::parse(path).unwrap(); // create CSV data source let listing_options = ListingOptions::new(Arc::new(CsvFormat::default())); - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), url) + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) .with_listing_options(listing_options) .with_schema(schema); diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index cd823d1545d29..748cad52ba7fb 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -156,28 +156,33 @@ impl ObjectStoreSchemaProvider { .register_store(scheme.into(), object_store) } - /// Retrieves a `ObjectStore` instance by scheme - pub fn object_store(&self, uri: &ListingTableUrl) -> Result> { + /// Retrieves a `ObjectStore` instance for a given Url + pub fn object_store( + &self, + url: impl AsRef, + ) -> Result> { self.object_store_registry .lock() - .get_by_url(uri) + .get_by_url(url) .map_err(DataFusionError::from) } /// If supported by the implementation, adds a new table to this schema by creating a - /// `ListingTable` from the provided `uri` and a previously registered `ObjectStore`. + /// `ListingTable` from the provided `url` and a previously registered `ObjectStore`. /// If a table of the same name existed before, it returns "Table already exists" error. pub async fn register_listing_table( &self, name: &str, - uri: ListingTableUrl, + table_path: ListingTableUrl, config: Option, ) -> Result<()> { let config = match config { Some(cfg) => cfg, None => { - let object_store = self.object_store(&uri)?; - ListingTableConfig::new(object_store, uri).infer().await? + let object_store = self.object_store(&table_path)?; + ListingTableConfig::new(object_store, table_path) + .infer() + .await? } }; @@ -278,13 +283,13 @@ mod tests { async fn test_schema_register_listing_table() { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); - let uri = ListingTableUrl::parse(filename).unwrap(); + let table_path = ListingTableUrl::parse(filename).unwrap(); let schema = ObjectStoreSchemaProvider::new(); let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {})); schema - .register_listing_table("alltypes_plain", uri, None) + .register_listing_table("alltypes_plain", table_path, None) .await .unwrap(); @@ -360,18 +365,18 @@ mod tests { async fn test_schema_register_same_listing_table() { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); - let uri = ListingTableUrl::parse(filename).unwrap(); + let table_path = ListingTableUrl::parse(filename).unwrap(); let schema = ObjectStoreSchemaProvider::new(); let _store = schema.register_object_store("test", Arc::new(LocalFileSystem {})); schema - .register_listing_table("alltypes_plain", uri.clone(), None) + .register_listing_table("alltypes_plain", table_path.clone(), None) .await .unwrap(); schema - .register_listing_table("alltypes_plain", uri, None) + .register_listing_table("alltypes_plain", table_path, None) .await .unwrap(); } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3b0f4340b4884..34e44971d6625 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -438,10 +438,12 @@ mod tests { async fn load_table_stats_by_default() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); - let uri = ListingTableUrl::parse(filename).unwrap(); + let table_path = ListingTableUrl::parse(filename).unwrap(); let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); - let schema = opt.infer_schema(Arc::new(LocalFileSystem {}), &uri).await?; - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), uri) + let schema = opt + .infer_schema(Arc::new(LocalFileSystem {}), &table_path) + .await?; + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) .with_listing_options(opt) .with_schema(schema); let table = ListingTable::try_new(config)?; @@ -563,8 +565,8 @@ mod tests { async fn load_table(name: &str) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); - let uri = ListingTableUrl::parse(filename).unwrap(); - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), uri) + let table_path = ListingTableUrl::parse(filename).unwrap(); + let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) .infer() .await?; let table = ListingTable::try_new(config)?; @@ -594,8 +596,8 @@ mod tests { let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); - let uri = ListingTableUrl::parse(table_prefix).unwrap(); - let config = ListingTableConfig::new(mock_store, uri) + let table_path = ListingTableUrl::parse(table_prefix).unwrap(); + let config = ListingTableConfig::new(mock_store, table_path) .with_listing_options(opt) .with_schema(Arc::new(schema)); diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs index 893f5df1a94af..e10b614cff319 100644 --- a/datafusion/core/src/datasource/object_store.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -133,10 +133,11 @@ impl ObjectStoreRegistry { stores.get(scheme).cloned() } - /// Get a suitable store for the URI based on it's scheme. For example: - /// - URI with scheme `file://` or no schema will return the default LocalFS store - /// - URI with scheme `s3://` will return the S3 store if it's registered - /// Returns a tuple with the store and the self-described uri of the file in that store + /// Get a suitable store for the provided URL. For example: + /// + /// - URL with scheme `file://` or no schema will return the default LocalFS store + /// - URL with scheme `s3://` will return the S3 store if it's registered + /// pub fn get_by_url(&self, url: impl AsRef) -> Result> { let url = url.as_ref(); let stores = self.object_stores.read(); @@ -185,21 +186,21 @@ mod tests { fn test_get_by_url_s3() { let sut = ObjectStoreRegistry::default(); sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {})); - let uri = ListingTableUrl::parse("s3://bucket/key").unwrap(); - sut.get_by_url(&uri).unwrap(); + let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); + sut.get_by_url(&url).unwrap(); } #[test] fn test_get_by_url_file() { let sut = ObjectStoreRegistry::default(); - let uri = ListingTableUrl::parse("file:///bucket/key").unwrap(); - sut.get_by_url(&uri).unwrap(); + let url = ListingTableUrl::parse("file:///bucket/key").unwrap(); + sut.get_by_url(&url).unwrap(); } #[test] fn test_get_by_url_local() { let sut = ObjectStoreRegistry::default(); - let uri = ListingTableUrl::parse("../").unwrap(); - sut.get_by_url(&uri).unwrap(); + let url = ListingTableUrl::parse("../").unwrap(); + sut.get_by_url(&url).unwrap(); } } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 71eb44563542d..4d579776e6660 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -519,11 +519,11 @@ impl SessionContext { /// Creates a DataFrame for reading an Avro data source. pub async fn read_avro( &self, - uri: impl AsRef, + table_path: impl AsRef, options: AvroReadOptions<'_>, ) -> Result> { - let uri = ListingTableUrl::parse(uri)?; - let object_store = self.runtime_env().object_store(&uri)?; + let table_path = ListingTableUrl::parse(table_path)?; + let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); @@ -532,12 +532,12 @@ impl SessionContext { Some(s) => s, None => { listing_options - .infer_schema(Arc::clone(&object_store), &uri) + .infer_schema(Arc::clone(&object_store), &table_path) .await? } }; - let config = ListingTableConfig::new(object_store, uri) + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(listing_options) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; @@ -547,11 +547,11 @@ impl SessionContext { /// Creates a DataFrame for reading an Json data source. pub async fn read_json( &mut self, - uri: impl AsRef, + table_path: impl AsRef, options: NdJsonReadOptions<'_>, ) -> Result> { - let uri = ListingTableUrl::parse(uri)?; - let object_store = self.runtime_env().object_store(&uri)?; + let table_path = ListingTableUrl::parse(table_path)?; + let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); @@ -560,11 +560,11 @@ impl SessionContext { Some(s) => s, None => { listing_options - .infer_schema(Arc::clone(&object_store), &uri) + .infer_schema(Arc::clone(&object_store), &table_path) .await? } }; - let config = ListingTableConfig::new(object_store, uri) + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(listing_options) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; @@ -583,22 +583,22 @@ impl SessionContext { /// Creates a DataFrame for reading a CSV data source. pub async fn read_csv( &self, - uri: impl AsRef, + table_path: impl AsRef, options: CsvReadOptions<'_>, ) -> Result> { - let uri = ListingTableUrl::parse(uri)?; - let object_store = self.runtime_env().object_store(&uri)?; + let table_path = ListingTableUrl::parse(table_path)?; + let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); let resolved_schema = match options.schema { Some(s) => Arc::new(s.to_owned()), None => { listing_options - .infer_schema(Arc::clone(&object_store), &uri) + .infer_schema(Arc::clone(&object_store), &table_path) .await? } }; - let config = ListingTableConfig::new(object_store, uri.clone()) + let config = ListingTableConfig::new(object_store, table_path.clone()) .with_listing_options(listing_options) .with_schema(resolved_schema); @@ -609,21 +609,21 @@ impl SessionContext { /// Creates a DataFrame for reading a Parquet data source. pub async fn read_parquet( &self, - uri: impl AsRef, + table_path: impl AsRef, options: ParquetReadOptions<'_>, ) -> Result> { - let uri = ListingTableUrl::parse(uri)?; - let object_store = self.runtime_env().object_store(&uri)?; + let table_path = ListingTableUrl::parse(table_path)?; + let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); // with parquet we resolve the schema in all cases let resolved_schema = listing_options - .infer_schema(Arc::clone(&object_store), &uri) + .infer_schema(Arc::clone(&object_store), &table_path) .await?; - let config = ListingTableConfig::new(object_store, uri) + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(listing_options) .with_schema(resolved_schema); @@ -646,11 +646,11 @@ impl SessionContext { pub async fn register_listing_table( &self, name: &str, - uri: impl AsRef, + table_path: impl AsRef, options: ListingOptions, provided_schema: Option, ) -> Result<()> { - let table_path = ListingTableUrl::parse(uri)?; + let table_path = ListingTableUrl::parse(table_path)?; let object_store = self.runtime_env().object_store(&table_path)?; let resolved_schema = match provided_schema { None => { @@ -673,7 +673,7 @@ impl SessionContext { pub async fn register_csv( &self, name: &str, - uri: &str, + table_path: &str, options: CsvReadOptions<'_>, ) -> Result<()> { let listing_options = @@ -681,7 +681,7 @@ impl SessionContext { self.register_listing_table( name, - uri, + table_path, listing_options, options.schema.map(|s| Arc::new(s.to_owned())), ) @@ -695,13 +695,13 @@ impl SessionContext { pub async fn register_json( &self, name: &str, - uri: &str, + table_path: &str, options: NdJsonReadOptions<'_>, ) -> Result<()> { let listing_options = options.to_listing_options(self.copied_config().target_partitions); - self.register_listing_table(name, uri, listing_options, options.schema) + self.register_listing_table(name, table_path, listing_options, options.schema) .await?; Ok(()) } @@ -711,7 +711,7 @@ impl SessionContext { pub async fn register_parquet( &self, name: &str, - uri: &str, + table_path: &str, options: ParquetReadOptions<'_>, ) -> Result<()> { let (target_partitions, parquet_pruning) = { @@ -722,7 +722,7 @@ impl SessionContext { .parquet_pruning(parquet_pruning) .to_listing_options(target_partitions); - self.register_listing_table(name, uri, listing_options, None) + self.register_listing_table(name, table_path, listing_options, None) .await?; Ok(()) } @@ -732,13 +732,13 @@ impl SessionContext { pub async fn register_avro( &self, name: &str, - uri: &str, + table_path: &str, options: AvroReadOptions<'_>, ) -> Result<()> { let listing_options = options.to_listing_options(self.copied_config().target_partitions); - self.register_listing_table(name, uri, listing_options, options.schema) + self.register_listing_table(name, table_path, listing_options, options.schema) .await?; Ok(()) } diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 9b2cba3f50f40..4d927339315bf 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -411,7 +411,7 @@ impl AsLogicalPlan for LogicalPlanNode { FileFormatType::Avro(..) => Arc::new(AvroFormat::default()), }; - let uri = ListingTableUrl::parse(&scan.path)?; + let table_path = ListingTableUrl::parse(&scan.path)?; let options = ListingOptions { file_extension: scan.file_extension.clone(), format: file_format, @@ -420,7 +420,7 @@ impl AsLogicalPlan for LogicalPlanNode { target_partitions: scan.target_partitions as usize, }; - let object_store = ctx.runtime_env().object_store(&uri)?; + let object_store = ctx.runtime_env().object_store(&table_path)?; println!( "Found object store {:?} for path {}", @@ -428,7 +428,7 @@ impl AsLogicalPlan for LogicalPlanNode { scan.path.as_str() ); - let config = ListingTableConfig::new(object_store, uri) + let config = ListingTableConfig::new(object_store, table_path) .with_listing_options(options) .with_schema(Arc::new(schema)); From a504fe84737068c75f2775157f39eaae642a1a13 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 May 2022 13:53:30 +0100 Subject: [PATCH 12/13] Review feedback --- .../core/src/datasource/listing/helpers.rs | 21 +++++++++++++++++++ datafusion/core/src/datasource/listing/mod.rs | 4 ++-- .../datasource/listing/{path.rs => url.rs} | 5 ++++- .../core/src/datasource/object_store.rs | 16 ++++++++++++-- 4 files changed, 41 insertions(+), 5 deletions(-) rename datafusion/core/src/datasource/listing/{path.rs => url.rs} (98%) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 330517563e580..9978c0a590ec5 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -580,6 +580,27 @@ mod tests { ); } + #[cfg(target_os = "windows")] + #[test] + fn test_parse_partitions_for_path_windows() { + assert_eq!( + Some(vec!["v1"]), + parse_partitions_for_path( + "bucket\\mytable", + "bucket\\mytable\\mypartition=v1\\file.csv", + &[String::from("mypartition")] + ) + ); + assert_eq!( + Some(vec!["v1", "v2"]), + parse_partitions_for_path( + "bucket\\mytable", + "bucket\\mytable\\mypartition=v1\\otherpartition=v2\\file.csv", + &[String::from("mypartition"), String::from("otherpartition")] + ) + ); + } + #[test] fn test_path_batch_roundtrip_no_partiton() { let files = vec![ diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index a4b68f45f0d3f..c11de5f8021ad 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -19,15 +19,15 @@ //! to get the list of files to process. mod helpers; -mod path; mod table; +mod url; use datafusion_common::ScalarValue; use datafusion_data_access::{FileMeta, Result, SizedFile}; use futures::Stream; use std::pin::Pin; -pub use path::ListingTableUrl; +pub use self::url::ListingTableUrl; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; /// Stream of files get listed from object store diff --git a/datafusion/core/src/datasource/listing/path.rs b/datafusion/core/src/datasource/listing/url.rs similarity index 98% rename from datafusion/core/src/datasource/listing/path.rs rename to datafusion/core/src/datasource/listing/url.rs index 2b0e3344a3c21..041a6ab7f0452 100644 --- a/datafusion/core/src/datasource/listing/path.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -56,7 +56,7 @@ impl ListingTableUrl { /// machine you must provide it as a fully-qualified [file URI] /// e.g. `file:///myfile.txt` /// - /// ## Glob Paths + /// ## Glob File Paths /// /// If no scheme is provided, and the path contains a glob expression, it will /// be resolved as follows. @@ -264,6 +264,9 @@ mod tests { let path = "bucket/foo/bar/partition/foo.parquet"; let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect(); assert_eq!(prefix, vec!["partition", "foo.parquet"]); + + let path = "other-bucket/foo/bar/partition/foo.parquet"; + assert!(url.strip_prefix(path).is_none()); } #[test] diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs index e10b614cff319..ac7e1a847075a 100644 --- a/datafusion/core/src/datasource/object_store.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -140,8 +140,7 @@ impl ObjectStoreRegistry { /// pub fn get_by_url(&self, url: impl AsRef) -> Result> { let url = url.as_ref(); - let stores = self.object_stores.read(); - let store = stores.get(url.scheme()).map(Clone::clone).ok_or_else(|| { + let store = self.get(url.scheme()).ok_or_else(|| { DataFusionError::Internal(format!( "No suitable object store found for {}", url @@ -175,11 +174,24 @@ mod tests { let url = ObjectStoreUrl::parse("s3://bucket").unwrap(); assert_eq!(url.as_str(), "s3://bucket/"); + let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap(); + assert_eq!(url.as_str(), "s3://username:password@host:123/"); + + let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err(); + assert_eq!(err.to_string(), "External error: invalid port number"); + let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err(); assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?"); let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err(); assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?foo=bar"); + + let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err(); + assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); + + let err = + ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err(); + assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); } #[test] From 0ec7433b63e3a59efffaf74f14132aa84d2a2dbe Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 May 2022 15:55:28 +0100 Subject: [PATCH 13/13] Fix windows test --- datafusion/core/src/datasource/listing/helpers.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 9978c0a590ec5..a26eafabb50ca 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -586,7 +586,7 @@ mod tests { assert_eq!( Some(vec!["v1"]), parse_partitions_for_path( - "bucket\\mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket\\mytable\\mypartition=v1\\file.csv", &[String::from("mypartition")] ) @@ -594,7 +594,7 @@ mod tests { assert_eq!( Some(vec!["v1", "v2"]), parse_partitions_for_path( - "bucket\\mytable", + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), "bucket\\mytable\\mypartition=v1\\otherpartition=v2\\file.csv", &[String::from("mypartition"), String::from("otherpartition")] )