Skip to content
Merged
43 changes: 41 additions & 2 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ under the License.
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<!-- Iceberg dependencies for testing native Iceberg scan -->
<!-- Note: The specific Iceberg artifact is defined in profiles below based on Spark version -->
<!-- Jetty and Iceberg dependencies for testing native Iceberg scan -->
<!-- Note: The specific versions are defined in profiles below based on Spark version -->
</dependencies>

<profiles>
Expand All @@ -188,6 +188,19 @@ under the License.
<version>1.5.2</version>
<scope>test</scope>
</dependency>
<!-- Jetty 9.4.x for Spark 3.4 (JDK 11, javax.* packages) -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.53.v20231009</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.4.53.v20231009</version>
<scope>test</scope>
</dependency>
</dependencies>
</profile>

Expand All @@ -203,6 +216,19 @@ under the License.
<version>1.8.1</version>
<scope>test</scope>
</dependency>
<!-- Jetty 9.4.x for Spark 3.5 (JDK 11, javax.* packages) -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.53.v20231009</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.4.53.v20231009</version>
<scope>test</scope>
</dependency>
</dependencies>
</profile>

Expand All @@ -215,6 +241,19 @@ under the License.
<version>1.10.0</version>
<scope>test</scope>
</dependency>
<!-- Jetty 11.x for Spark 4.0 (jakarta.servlet) -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>11.0.24</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>11.0.24</version>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,11 @@ object IcebergReflection extends Logging {
case _: NoSuchMethodException =>
try {
// If not directly available, access via operations/metadata
val opsMethod = table.getClass.getMethod("operations")
val opsMethod = table.getClass.getDeclaredMethod("operations")
opsMethod.setAccessible(true)
Comment on lines +207 to +208
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add a helper method for this pattern of getDeclaredMethod and setAccessible since this is repeated. Can be in a future PR

val ops = opsMethod.invoke(table)
val currentMethod = ops.getClass.getMethod("current")
val currentMethod = ops.getClass.getDeclaredMethod("current")
currentMethod.setAccessible(true)
val metadata = currentMethod.invoke(ops)
val formatVersionMethod = metadata.getClass.getMethod("formatVersion")
Some(formatVersionMethod.invoke(metadata).asInstanceOf[Int])
Expand Down Expand Up @@ -274,10 +276,12 @@ object IcebergReflection extends Logging {
*/
def getTableMetadata(table: Any): Option[Any] = {
try {
val operationsMethod = table.getClass.getMethod("operations")
val operationsMethod = table.getClass.getDeclaredMethod("operations")
operationsMethod.setAccessible(true)
val operations = operationsMethod.invoke(table)

val currentMethod = operations.getClass.getMethod("current")
val currentMethod = operations.getClass.getDeclaredMethod("current")
currentMethod.setAccessible(true)
Some(currentMethod.invoke(operations))
} catch {
case e: Exception =>
Expand Down
59 changes: 45 additions & 14 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,22 +284,56 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
// Extract all Iceberg metadata once using reflection.
// If any required reflection fails, this returns None, and we fall back to Spark.
// First get metadataLocation and catalogProperties which are needed by the factory.
val metadataLocationOpt = IcebergReflection
.getTable(scanExec.scan)
.flatMap(IcebergReflection.getMetadataLocation)
val tableOpt = IcebergReflection.getTable(scanExec.scan)

val metadataLocationOpt = tableOpt.flatMap { table =>
IcebergReflection.getMetadataLocation(table)
}

val metadataOpt = metadataLocationOpt.flatMap { metadataLocation =>
try {
val session = org.apache.spark.sql.SparkSession.active
val hadoopConf = session.sessionState.newHadoopConf()

// For REST catalogs, the metadata file may not exist on disk since metadata
// is fetched via HTTP. Check if file exists; if not, use table location instead.
val metadataUri = new java.net.URI(metadataLocation)
val hadoopS3Options = NativeConfig.extractObjectStoreOptions(hadoopConf, metadataUri)

val metadataFile = new java.io.File(metadataUri.getPath)

val effectiveLocation =
if (!metadataFile.exists() && metadataUri.getScheme == "file") {
// Metadata file doesn't exist (REST catalog with InMemoryFileIO or similar)
// Use table location instead for FileIO initialization

tableOpt
.flatMap { table =>
try {
val locationMethod = table.getClass.getMethod("location")
val tableLocation = locationMethod.invoke(table).asInstanceOf[String]
Some(tableLocation)
} catch {
case _: Exception =>
Some(metadataLocation)
}
}
.getOrElse(metadataLocation)
} else {
metadataLocation
}

val effectiveUri = new java.net.URI(effectiveLocation)

val hadoopS3Options = NativeConfig.extractObjectStoreOptions(hadoopConf, effectiveUri)

val catalogProperties =
org.apache.comet.serde.operator.CometIcebergNativeScan
.hadoopToIcebergS3Properties(hadoopS3Options)

CometIcebergNativeScanMetadata
.extract(scanExec.scan, metadataLocation, catalogProperties)
val result = CometIcebergNativeScanMetadata
.extract(scanExec.scan, effectiveLocation, catalogProperties)

result
} catch {
case e: Exception =>
logError(
Expand All @@ -319,21 +353,18 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com

// Now perform all validation using the pre-extracted metadata
// Check if table uses a FileIO implementation compatible with iceberg-rust

val fileIOCompatible = IcebergReflection.getFileIO(metadata.table) match {
case Some(fileIO) =>
val fileIOClassName = fileIO.getClass.getName
if (fileIOClassName == "org.apache.iceberg.inmemory.InMemoryFileIO") {
fallbackReasons += "Comet does not support InMemoryFileIO table locations"
false
} else {
true
}
case Some(_) =>
// InMemoryFileIO is now supported with table location fallback for REST catalogs
true
case None =>
fallbackReasons += "Could not check FileIO compatibility"
false
}

// Check Iceberg table format version

val formatVersionSupported = IcebergReflection.getFormatVersion(metadata.table) match {
case Some(formatVersion) =>
if (formatVersion > 2) {
Expand Down
Loading
Loading