From 20dc9ac2eba01835a8eafbc573eadc9e28279970 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Wed, 13 Aug 2025 17:05:33 +0800 Subject: [PATCH 1/4] Fix repeatedly url-decode path when reading parquet from s3 using native parquet reader --- native/core/src/parquet/objectstore/s3.rs | 2 +- .../parquet/ParquetReadFromS3Suite.scala | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index e93f99a87b..cc257bb826 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -66,7 +66,7 @@ pub fn create_store( source: format!("Scheme of URL is not S3: {url}").into(), }); } - let path = Path::from_url_path(path)?; + let path = Path::parse(path)?; let mut builder = AmazonS3Builder::new() .with_url(url.to_string()) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index f94e53ed94..5ed57aef46 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.comet.CometNativeScanExec import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{col, sum} +import org.apache.spark.sql.functions.expr +import org.apache.spark.sql.functions.max import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider @@ -118,4 +120,27 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper assert(df.first().getLong(0) == 499500) } + + private def writePartitionedTestParquetFile(filePath: String): Unit = { + val df = spark.range(0, 1000).withColumn("val", expr("concat('val#', id % 10)")) + df.write.format("parquet").partitionBy("val").mode(SaveMode.Overwrite).save(filePath) + } + + test("write partitioned data and read from MinIO") { + val testFilePath = s"s3a://$testBucketName/data/test-partitioned" + writePartitionedTestParquetFile(testFilePath) + + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")), max(col("val"))) + val scans = collect(df.queryExecution.executedPlan) { + case p: CometScanExec => + p + case p: CometNativeScanExec => + p + } + assert(scans.size == 1) + + val firstRow = df.first() + assert(firstRow.getLong(0) == 499500) + assert(firstRow.getString(1) == "val#9") + } } From 6ab434a9634234e21f63f559cb69c8589b143fe3 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Wed, 13 Aug 2025 18:00:59 +0800 Subject: [PATCH 2/4] Make ParquetReadFromS3Suite runs using all scan impls --- .../parquet/ParquetReadFromS3Suite.scala | 60 ++++++++++++------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index 5ed57aef46..eb04f7c8d5 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -36,6 +36,8 @@ import org.apache.spark.sql.functions.{col, sum} import org.apache.spark.sql.functions.expr import org.apache.spark.sql.functions.max +import org.apache.comet.CometConf + import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider import software.amazon.awssdk.services.s3.S3Client @@ -109,16 +111,22 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper val testFilePath = s"s3a://$testBucketName/data/test-file.parquet" writeTestParquetFile(testFilePath) - val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) - val scans = collect(df.queryExecution.executedPlan) { - case p: CometScanExec => - p - case p: CometNativeScanExec => - p - } - assert(scans.size == 1) - - assert(df.first().getLong(0) == 499500) + Seq( + CometConf.SCAN_NATIVE_COMET, + CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(scanMode => { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + val scans = collect(df.queryExecution.executedPlan) { + case p: CometScanExec => + p + case p: CometNativeScanExec => + p + } + assert(scans.size == 1) + assert(df.first().getLong(0) == 499500) + } + }) } private def writePartitionedTestParquetFile(filePath: String): Unit = { @@ -130,17 +138,25 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper val testFilePath = s"s3a://$testBucketName/data/test-partitioned" writePartitionedTestParquetFile(testFilePath) - val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")), max(col("val"))) - val scans = collect(df.queryExecution.executedPlan) { - case p: CometScanExec => - p - case p: CometNativeScanExec => - p - } - assert(scans.size == 1) - - val firstRow = df.first() - assert(firstRow.getLong(0) == 499500) - assert(firstRow.getString(1) == "val#9") + Seq( + CometConf.SCAN_NATIVE_COMET, + CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(scanMode => { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { + val df = + spark.read.format("parquet").load(testFilePath).agg(sum(col("id")), max(col("val"))) + val scans = collect(df.queryExecution.executedPlan) { + case p: CometScanExec => + p + case p: CometNativeScanExec => + p + } + assert(scans.size == 1) + + val firstRow = df.first() + assert(firstRow.getLong(0) == 499500) + assert(firstRow.getString(1) == "val#9") + } + }) } } From 0643e4192e08f7a8181a4fae58373ce062347150 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Thu, 14 Aug 2025 13:18:35 +0800 Subject: [PATCH 3/4] test: support URL escape sequences in write path --- .../comet/parquet/ParquetReadFromS3Suite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index eb04f7c8d5..b7c04b4e14 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -159,4 +159,21 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper } }) } + + test("read parquet file from MinIO with URL escape sequences in path") { + // Path with '%20' which is a URL escape for space + val testFilePath = s"s3a://$testBucketName/data/test%20file.parquet" + writeTestParquetFile(testFilePath) + + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + val scans = collect(df.queryExecution.executedPlan) { + case p: CometScanExec => + p + case p: CometNativeScanExec => + p + } + assert(scans.size == 1) + + assert(df.first().getLong(0) == 499500) + } } From 630ed521ba7e991505b1dd09acb8c0da7dcea74f Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Thu, 14 Aug 2025 13:38:25 +0800 Subject: [PATCH 4/4] Improve S3 parquet read tests --- .../parquet/ParquetReadFromS3Suite.scala | 88 ++++++------------- 1 file changed, 28 insertions(+), 60 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala index b7c04b4e14..fb5f90580e 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala @@ -28,15 +28,12 @@ import org.testcontainers.utility.DockerImageName import org.apache.spark.SparkConf import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SaveMode import org.apache.spark.sql.comet.CometNativeScanExec import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.functions.{col, sum} -import org.apache.spark.sql.functions.expr -import org.apache.spark.sql.functions.max - -import org.apache.comet.CometConf +import org.apache.spark.sql.functions.{col, expr, max, sum} import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider @@ -106,74 +103,45 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath) } - test("read parquet file from MinIO") { + private def writePartitionedParquetFile(filePath: String): Unit = { + val df = spark.range(0, 1000).withColumn("val", expr("concat('val#', id % 10)")) + df.write.format("parquet").partitionBy("val").mode(SaveMode.Overwrite).save(filePath) + } + private def assertCometScan(df: DataFrame): Unit = { + val scans = collect(df.queryExecution.executedPlan) { + case p: CometScanExec => p + case p: CometNativeScanExec => p + } + assert(scans.size == 1) + } + + test("read parquet file from MinIO") { val testFilePath = s"s3a://$testBucketName/data/test-file.parquet" writeTestParquetFile(testFilePath) - Seq( - CometConf.SCAN_NATIVE_COMET, - CometConf.SCAN_NATIVE_DATAFUSION, - CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(scanMode => { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { - val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) - val scans = collect(df.queryExecution.executedPlan) { - case p: CometScanExec => - p - case p: CometNativeScanExec => - p - } - assert(scans.size == 1) - assert(df.first().getLong(0) == 499500) - } - }) + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + assertCometScan(df) + assert(df.first().getLong(0) == 499500) } - private def writePartitionedTestParquetFile(filePath: String): Unit = { - val df = spark.range(0, 1000).withColumn("val", expr("concat('val#', id % 10)")) - df.write.format("parquet").partitionBy("val").mode(SaveMode.Overwrite).save(filePath) - } + test("read partitioned parquet file from MinIO") { + val testFilePath = s"s3a://$testBucketName/data/test-partitioned-file.parquet" + writePartitionedParquetFile(testFilePath) - test("write partitioned data and read from MinIO") { - val testFilePath = s"s3a://$testBucketName/data/test-partitioned" - writePartitionedTestParquetFile(testFilePath) - - Seq( - CometConf.SCAN_NATIVE_COMET, - CometConf.SCAN_NATIVE_DATAFUSION, - CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(scanMode => { - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { - val df = - spark.read.format("parquet").load(testFilePath).agg(sum(col("id")), max(col("val"))) - val scans = collect(df.queryExecution.executedPlan) { - case p: CometScanExec => - p - case p: CometNativeScanExec => - p - } - assert(scans.size == 1) - - val firstRow = df.first() - assert(firstRow.getLong(0) == 499500) - assert(firstRow.getString(1) == "val#9") - } - }) + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")), max(col("val"))) + val firstRow = df.first() + assert(firstRow.getLong(0) == 499500) + assert(firstRow.getString(1) == "val#9") } test("read parquet file from MinIO with URL escape sequences in path") { - // Path with '%20' which is a URL escape for space - val testFilePath = s"s3a://$testBucketName/data/test%20file.parquet" + // Path with '%23' and '%20' which are URL escape sequences for '#' and ' ' + val testFilePath = s"s3a://$testBucketName/data/Brand%2321/test%20file.parquet" writeTestParquetFile(testFilePath) val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) - val scans = collect(df.queryExecution.executedPlan) { - case p: CometScanExec => - p - case p: CometNativeScanExec => - p - } - assert(scans.size == 1) - + assertCometScan(df) assert(df.first().getLong(0) == 499500) } }