diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index e93f99a87b..cc257bb826 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -66,7 +66,7 @@ pub fn create_store( source: format!("Scheme of URL is not S3: {url}").into(), }); } - let path = Path::from_url_path(path)?; + let path = Path::parse(path)?; let mut builder = AmazonS3Builder::new() .with_url(url.to_string()) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index f94e53ed94..fb5f90580e 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -28,11 +28,12 @@ import org.testcontainers.utility.DockerImageName import org.apache.spark.SparkConf import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SaveMode import org.apache.spark.sql.comet.CometNativeScanExec import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.functions.{col, sum} +import org.apache.spark.sql.functions.{col, expr, max, sum} import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider @@ -102,20 +103,45 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath) } - test("read parquet file from MinIO") { + private def writePartitionedParquetFile(filePath: String): Unit = { + val df = spark.range(0, 1000).withColumn("val", expr("concat('val#', id % 10)")) + df.write.format("parquet").partitionBy("val").mode(SaveMode.Overwrite).save(filePath) + } + + private def assertCometScan(df: DataFrame): Unit = { + val scans = collect(df.queryExecution.executedPlan) { + case p: CometScanExec => p + case p: CometNativeScanExec => p + } + assert(scans.size == 1) + } + test("read parquet file from MinIO") { val testFilePath = s"s3a://$testBucketName/data/test-file.parquet" writeTestParquetFile(testFilePath) val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) - val scans = collect(df.queryExecution.executedPlan) { - case p: CometScanExec => - p - case p: CometNativeScanExec => - p - } - assert(scans.size == 1) + assertCometScan(df) + assert(df.first().getLong(0) == 499500) + } + test("read partitioned parquet file from MinIO") { + val testFilePath = s"s3a://$testBucketName/data/test-partitioned-file.parquet" + writePartitionedParquetFile(testFilePath) + + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")), max(col("val"))) + val firstRow = df.first() + assert(firstRow.getLong(0) == 499500) + assert(firstRow.getString(1) == "val#9") + } + + test("read parquet file from MinIO with URL escape sequences in path") { + // Path with '%23' and '%20' which are URL escape sequences for '#' and ' ' + val testFilePath = s"s3a://$testBucketName/data/Brand%2321/test%20file.parquet" + writeTestParquetFile(testFilePath) + + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + assertCometScan(df) assert(df.first().getLong(0) == 499500) } }