Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion native/core/src/parquet/objectstore/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn create_store(
source: format!("Scheme of URL is not S3: {url}").into(),
});
}
let path = Path::from_url_path(path)?;
let path = Path::parse(path)?;

let mut builder = AmazonS3Builder::new()
.with_url(url.to_string())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import org.testcontainers.utility.DockerImageName

import org.apache.spark.SparkConf
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.comet.CometNativeScanExec
import org.apache.spark.sql.comet.CometScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.{col, sum}
import org.apache.spark.sql.functions.{col, expr, max, sum}

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
Expand Down Expand Up @@ -102,20 +103,45 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper
df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath)
}

test("read parquet file from MinIO") {
private def writePartitionedParquetFile(filePath: String): Unit = {
val df = spark.range(0, 1000).withColumn("val", expr("concat('val#', id % 10)"))
df.write.format("parquet").partitionBy("val").mode(SaveMode.Overwrite).save(filePath)
}

private def assertCometScan(df: DataFrame): Unit = {
val scans = collect(df.queryExecution.executedPlan) {
case p: CometScanExec => p
case p: CometNativeScanExec => p
}
assert(scans.size == 1)
}

test("read parquet file from MinIO") {
val testFilePath = s"s3a://$testBucketName/data/test-file.parquet"
writeTestParquetFile(testFilePath)

val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
val scans = collect(df.queryExecution.executedPlan) {
case p: CometScanExec =>
p
case p: CometNativeScanExec =>
p
}
assert(scans.size == 1)
assertCometScan(df)
assert(df.first().getLong(0) == 499500)
}

test("read partitioned parquet file from MinIO") {
val testFilePath = s"s3a://$testBucketName/data/test-partitioned-file.parquet"
writePartitionedParquetFile(testFilePath)

val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")), max(col("val")))
val firstRow = df.first()
assert(firstRow.getLong(0) == 499500)
assert(firstRow.getString(1) == "val#9")
}

test("read parquet file from MinIO with URL escape sequences in path") {
// Path with '%23' and '%20' which are URL escape sequences for '#' and ' '
val testFilePath = s"s3a://$testBucketName/data/Brand%2321/test%20file.parquet"
writeTestParquetFile(testFilePath)

val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
assertCometScan(df)
assert(df.first().getLong(0) == 499500)
}
}
Loading