diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index d778212392..d242596a91 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -19,8 +19,89 @@ package org.apache.comet.iceberg +import java.lang.reflect.Method + import org.apache.spark.internal.Logging +/** Cached Iceberg classes and methods to avoid repeated reflection lookups. */ +case class IcebergReflectionCache( + contentScanTaskClass: Class[_], + fileScanTaskClass: Class[_], + contentFileClass: Class[_], + deleteFileClass: Class[_], + schemaParserClass: Class[_], + schemaClass: Class[_], + partitionSpecParserClass: Class[_], + partitionSpecClass: Class[_], + fileMethod: Method, + startMethod: Method, + lengthMethod: Method, + partitionMethod: Method, + residualMethod: Method, + taskSchemaMethod: Method, + deletesMethod: Method, + specMethod: Method, + schemaToJsonMethod: Method, + specToJsonMethod: Method, + deleteContentMethod: Method, + deleteSpecIdMethod: Method, + deleteEqualityIdsMethod: Method) + +object IcebergReflectionCache extends Logging { + + def create(): IcebergReflectionCache = { + // scalastyle:off classforname + val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) + val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) + val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) + val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE) + val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) + val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) + val partitionSpecParserClass = + Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER) + val partitionSpecClass = Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC) + // scalastyle:on classforname + + val fileMethod = contentScanTaskClass.getMethod("file") + val startMethod = contentScanTaskClass.getMethod("start") + val lengthMethod = contentScanTaskClass.getMethod("length") + val partitionMethod = contentScanTaskClass.getMethod("partition") + val residualMethod = contentScanTaskClass.getMethod("residual") + val taskSchemaMethod = fileScanTaskClass.getMethod("schema") + val deletesMethod = fileScanTaskClass.getMethod("deletes") + val specMethod = fileScanTaskClass.getMethod("spec") + val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) + schemaToJsonMethod.setAccessible(true) + val specToJsonMethod = partitionSpecParserClass.getMethod("toJson", partitionSpecClass) + val deleteContentMethod = deleteFileClass.getMethod("content") + val deleteSpecIdMethod = deleteFileClass.getMethod("specId") + val deleteEqualityIdsMethod = deleteFileClass.getMethod("equalityFieldIds") + + IcebergReflectionCache( + contentScanTaskClass = contentScanTaskClass, + fileScanTaskClass = fileScanTaskClass, + contentFileClass = contentFileClass, + deleteFileClass = deleteFileClass, + schemaParserClass = schemaParserClass, + schemaClass = schemaClass, + partitionSpecParserClass = partitionSpecParserClass, + partitionSpecClass = partitionSpecClass, + fileMethod = fileMethod, + startMethod = startMethod, + lengthMethod = lengthMethod, + partitionMethod = partitionMethod, + residualMethod = residualMethod, + taskSchemaMethod = taskSchemaMethod, + deletesMethod = deletesMethod, + specMethod = specMethod, + schemaToJsonMethod = schemaToJsonMethod, + specToJsonMethod = specToJsonMethod, + deleteContentMethod = deleteContentMethod, + deleteSpecIdMethod = deleteSpecIdMethod, + deleteEqualityIdsMethod = deleteEqualityIdsMethod) + } +} + /** * Shared reflection utilities for Iceberg operations. * diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index c86b2a51bb..8f39a3e309 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceR import org.apache.spark.sql.types._ import org.apache.comet.ConfigEntry -import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} +import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection, IcebergReflectionCache} import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField} @@ -218,24 +218,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } - /** - * Extracts delete files from an Iceberg FileScanTask as a list (for deduplication). - */ private def extractDeleteFilesList( task: Any, - contentFileClass: Class[_], - fileScanTaskClass: Class[_]): Seq[OperatorOuterClass.IcebergDeleteFile] = { + cache: IcebergReflectionCache): Seq[OperatorOuterClass.IcebergDeleteFile] = { try { - // scalastyle:off classforname - val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE) - // scalastyle:on classforname - - val deletes = IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) + val deletes = cache.deletesMethod.invoke(task).asInstanceOf[java.util.List[_]] + val deletesList = if (deletes == null) new java.util.ArrayList[Any]() else deletes - deletes.asScala.flatMap { deleteFile => + deletesList.asScala.flatMap { deleteFile => try { IcebergReflection - .extractFileLocation(contentFileClass, deleteFile) + .extractFileLocation(cache.contentFileClass, deleteFile) .map { deletePath => val deleteBuilder = OperatorOuterClass.IcebergDeleteFile.newBuilder() @@ -243,8 +236,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val contentType = try { - val contentMethod = deleteFileClass.getMethod("content") - val content = contentMethod.invoke(deleteFile) + val content = cache.deleteContentMethod.invoke(deleteFile) content.toString match { case IcebergReflection.ContentTypes.POSITION_DELETES => IcebergReflection.ContentTypes.POSITION_DELETES @@ -260,8 +252,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val specId = try { - val specIdMethod = deleteFileClass.getMethod("specId") - specIdMethod.invoke(deleteFile).asInstanceOf[Int] + cache.deleteSpecIdMethod.invoke(deleteFile).asInstanceOf[Int] } catch { case _: Exception => 0 @@ -269,12 +260,12 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit deleteBuilder.setPartitionSpecId(specId) try { - val equalityIdsMethod = - deleteFileClass.getMethod("equalityFieldIds") - val equalityIds = equalityIdsMethod + val equalityIds = cache.deleteEqualityIdsMethod .invoke(deleteFile) .asInstanceOf[java.util.List[Integer]] - equalityIds.forEach(id => deleteBuilder.addEqualityIds(id)) + if (equalityIds != null) { + equalityIds.forEach(id => deleteBuilder.addEqualityIds(id)) + } } catch { case _: Exception => } @@ -297,38 +288,21 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } } - /** - * Serializes partition spec and data from an Iceberg FileScanTask. - * - * Extracts partition specification (field definitions and transforms) and partition data - * (actual values) from the task. This information is used by the native execution engine to - * build a constants_map for identity-transformed partition columns and to handle - * partition-level filtering. - */ private def serializePartitionData( task: Any, - contentScanTaskClass: Class[_], - fileScanTaskClass: Class[_], + cache: IcebergReflectionCache, taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder, commonBuilder: OperatorOuterClass.IcebergScanCommon.Builder, partitionTypeToPoolIndex: mutable.HashMap[String, Int], partitionSpecToPoolIndex: mutable.HashMap[String, Int], partitionDataToPoolIndex: mutable.HashMap[String, Int]): Unit = { try { - val specMethod = fileScanTaskClass.getMethod("spec") - val spec = specMethod.invoke(task) + val spec = cache.specMethod.invoke(task) if (spec != null) { // Deduplicate partition spec try { - // scalastyle:off classforname - val partitionSpecParserClass = - Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER) - val toJsonMethod = partitionSpecParserClass.getMethod( - "toJson", - Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC)) - // scalastyle:on classforname - val partitionSpecJson = toJsonMethod + val partitionSpecJson = cache.specToJsonMethod .invoke(null, spec) .asInstanceOf[String] @@ -345,8 +319,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } // Get partition data from the task (via file().partition()) - val partitionMethod = contentScanTaskClass.getMethod("partition") - val partitionData = partitionMethod.invoke(task) + val partitionData = cache.partitionMethod.invoke(task) if (partitionData != null) { // Get the partition type/schema from the spec @@ -770,25 +743,9 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit commonBuilder.addRequiredSchema(field.build()) } - // Load Iceberg classes once (avoid repeated class loading in loop) - // scalastyle:off classforname - val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) - val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) - val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) - val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) - val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) - // scalastyle:on classforname - - // Cache method lookups (avoid repeated getMethod in loop) - val fileMethod = contentScanTaskClass.getMethod("file") - val startMethod = contentScanTaskClass.getMethod("start") - val lengthMethod = contentScanTaskClass.getMethod("length") - val residualMethod = contentScanTaskClass.getMethod("residual") - val taskSchemaMethod = fileScanTaskClass.getMethod("schema") - val toJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) - toJsonMethod.setAccessible(true) + val cache = IcebergReflectionCache.create() + val fieldIdMappingCache = mutable.HashMap[AnyRef, Map[String, Int]]() - // Access inputRDD - safe now, DPP is resolved scanExec.inputRDD match { case rdd: DataSourceRDD => val partitions = rdd.partitions @@ -817,10 +774,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val taskBuilder = OperatorOuterClass.IcebergFileScanTask.newBuilder() - val dataFile = fileMethod.invoke(task) + val dataFile = cache.fileMethod.invoke(task) val filePathOpt = - IcebergReflection.extractFileLocation(contentFileClass, dataFile) + IcebergReflection.extractFileLocation(cache.contentFileClass, dataFile) filePathOpt match { case Some(filePath) => @@ -832,17 +789,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit throw new RuntimeException(msg) } - val start = startMethod.invoke(task).asInstanceOf[Long] + val start = cache.startMethod.invoke(task).asInstanceOf[Long] taskBuilder.setStart(start) - val length = lengthMethod.invoke(task).asInstanceOf[Long] + val length = cache.lengthMethod.invoke(task).asInstanceOf[Long] taskBuilder.setLength(length) - val taskSchema = taskSchemaMethod.invoke(task) + val taskSchema = cache.taskSchemaMethod.invoke(task) - val deletes = - IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) - val hasDeletes = !deletes.isEmpty + val deletes = cache.deletesMethod.invoke(task).asInstanceOf[java.util.List[_]] + val deletesList = if (deletes == null) new java.util.ArrayList[Any]() else deletes + val hasDeletes = !deletesList.isEmpty val schema: AnyRef = if (hasDeletes) { @@ -869,13 +826,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val schemaIdx = schemaToPoolIndex.getOrElseUpdate( schema, { val idx = schemaToPoolIndex.size - val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String] + val schemaJson = + cache.schemaToJsonMethod.invoke(null, schema).asInstanceOf[String] commonBuilder.addSchemaPool(schemaJson) idx }) taskBuilder.setSchemaIdx(schemaIdx) - val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema) + // Use cached field ID mapping to avoid repeated reflection per-task + val nameToFieldId = fieldIdMappingCache.getOrElseUpdate( + schema, + IcebergReflection.buildFieldIdMapping(schema)) val projectFieldIds = output.flatMap { attr => nameToFieldId @@ -898,8 +859,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit }) taskBuilder.setProjectFieldIdsIdx(projectFieldIdsIdx) - val deleteFilesList = - extractDeleteFilesList(task, contentFileClass, fileScanTaskClass) + val deleteFilesList = extractDeleteFilesList(task, cache) if (deleteFilesList.nonEmpty) { val deleteFilesIdx = deleteFilesToPoolIndex.getOrElseUpdate( deleteFilesList, { @@ -914,7 +874,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val residualExprOpt = try { - val residualExpr = residualMethod.invoke(task) + val residualExpr = cache.residualMethod.invoke(task) val catalystExpr = convertIcebergExpression(residualExpr, output) catalystExpr.flatMap { expr => exprToProto(expr, output, binding = false) @@ -939,8 +899,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit serializePartitionData( task, - contentScanTaskClass, - fileScanTaskClass, + cache, taskBuilder, commonBuilder, partitionTypeToPoolIndex, diff --git a/spark/src/test/scala/org/apache/comet/iceberg/IcebergReflectionCacheSuite.scala b/spark/src/test/scala/org/apache/comet/iceberg/IcebergReflectionCacheSuite.scala new file mode 100644 index 0000000000..4dc82ad226 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/iceberg/IcebergReflectionCacheSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg + +import org.scalatest.funsuite.AnyFunSuite + +class IcebergReflectionCacheSuite extends AnyFunSuite { + + private def icebergAvailable: Boolean = { + try { + Class.forName("org.apache.iceberg.ContentScanTask") + true + } catch { + case _: ClassNotFoundException => false + } + } + + test("IcebergReflectionCache.create() loads all Iceberg classes") { + assume(icebergAvailable, "Iceberg not available in classpath") + + val cache = IcebergReflectionCache.create() + + assert(cache.contentScanTaskClass != null) + assert(cache.fileScanTaskClass != null) + assert(cache.contentFileClass != null) + assert(cache.deleteFileClass != null) + assert(cache.schemaParserClass != null) + assert(cache.schemaClass != null) + assert(cache.partitionSpecParserClass != null) + assert(cache.partitionSpecClass != null) + + assert(cache.contentScanTaskClass.getName == IcebergReflection.ClassNames.CONTENT_SCAN_TASK) + assert(cache.fileScanTaskClass.getName == IcebergReflection.ClassNames.FILE_SCAN_TASK) + assert(cache.contentFileClass.getName == IcebergReflection.ClassNames.CONTENT_FILE) + assert(cache.deleteFileClass.getName == IcebergReflection.ClassNames.DELETE_FILE) + assert(cache.schemaClass.getName == IcebergReflection.ClassNames.SCHEMA) + assert(cache.partitionSpecClass.getName == IcebergReflection.ClassNames.PARTITION_SPEC) + } + + test("IcebergReflectionCache.create() resolves all methods") { + assume(icebergAvailable, "Iceberg not available in classpath") + + val cache = IcebergReflectionCache.create() + + assert(cache.fileMethod != null) + assert(cache.startMethod != null) + assert(cache.lengthMethod != null) + assert(cache.partitionMethod != null) + assert(cache.residualMethod != null) + assert(cache.taskSchemaMethod != null) + assert(cache.deletesMethod != null) + assert(cache.specMethod != null) + assert(cache.schemaToJsonMethod != null) + assert(cache.specToJsonMethod != null) + assert(cache.deleteContentMethod != null) + assert(cache.deleteSpecIdMethod != null) + assert(cache.deleteEqualityIdsMethod != null) + + assert(cache.fileMethod.getName == "file") + assert(cache.startMethod.getName == "start") + assert(cache.lengthMethod.getName == "length") + assert(cache.partitionMethod.getName == "partition") + assert(cache.residualMethod.getName == "residual") + assert(cache.taskSchemaMethod.getName == "schema") + assert(cache.deletesMethod.getName == "deletes") + assert(cache.specMethod.getName == "spec") + assert(cache.schemaToJsonMethod.getName == "toJson") + assert(cache.specToJsonMethod.getName == "toJson") + assert(cache.deleteContentMethod.getName == "content") + assert(cache.deleteSpecIdMethod.getName == "specId") + assert(cache.deleteEqualityIdsMethod.getName == "equalityFieldIds") + } + + test("IcebergReflectionCache is reusable across multiple calls") { + assume(icebergAvailable, "Iceberg not available in classpath") + + val cache = IcebergReflectionCache.create() + + assert(cache.contentScanTaskClass != null) + assert(cache.fileMethod != null) + assert(cache.schemaToJsonMethod != null) + } + + test("IcebergReflectionCache schemaToJsonMethod is accessible") { + assume(icebergAvailable, "Iceberg not available in classpath") + + val cache = IcebergReflectionCache.create() + + assert(cache.schemaToJsonMethod.isAccessible) + } + + test("Multiple IcebergReflectionCache instances share underlying class references") { + assume(icebergAvailable, "Iceberg not available in classpath") + + val cache1 = IcebergReflectionCache.create() + val cache2 = IcebergReflectionCache.create() + + assert(cache1.contentScanTaskClass != null) + assert(cache2.contentScanTaskClass != null) + assert(cache1.contentScanTaskClass eq cache2.contentScanTaskClass) + assert(cache1.fileMethod.getName == cache2.fileMethod.getName) + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala new file mode 100644 index 0000000000..18e9f5038b --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import java.io.File +import java.nio.file.Files + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.comet.CometIcebergNativeScanExec +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec + +import org.apache.comet.CometConf +import org.apache.comet.serde.operator.CometIcebergNativeScan + +/** + * Benchmark for Iceberg FileScanTask serialization performance. + * + * This benchmark specifically measures the serializePartitions() method which performs the heavy + * reflection work of converting Iceberg Java objects to protobuf. + * + * Use this to validate performance improvements from reflection caching optimizations (see GitHub + * issue #3456). + * + * To run this benchmark: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make \ + * benchmark-org.apache.spark.sql.benchmark.CometIcebergSerializationBenchmark + * }}} + * + * Results will be written to "spark/benchmarks/CometIcebergSerializationBenchmark-*results.txt". + */ +object CometIcebergSerializationBenchmark extends CometBenchmarkBase { + + private def icebergAvailable: Boolean = { + try { + Class.forName("org.apache.iceberg.catalog.Catalog") + true + } catch { + case _: ClassNotFoundException => false + } + } + + private def withTempIcebergDir(f: File => Unit): Unit = { + val dir = Files.createTempDirectory("comet-iceberg-serde-bench").toFile + try { + f(dir) + } finally { + def deleteRecursively(file: File): Unit = { + if (file.isDirectory) { + Option(file.listFiles()).foreach(_.foreach(deleteRecursively)) + } + file.delete() + } + deleteRecursively(dir) + } + } + + private def extractIcebergNativeScanExec( + plan: SparkPlan): Option[CometIcebergNativeScanExec] = { + val unwrapped = plan match { + case aqe: AdaptiveSparkPlanExec => aqe.executedPlan + case other => other + } + + def find(p: SparkPlan): Option[CometIcebergNativeScanExec] = { + p match { + case scan: CometIcebergNativeScanExec => Some(scan) + case _ => p.children.flatMap(find).headOption + } + } + find(unwrapped) + } + + private def createPartitionedIcebergTable( + warehouseDir: File, + numPartitions: Int, + tableName: String = "serde_bench_table"): Unit = { + spark.conf.set("spark.sql.catalog.bench_cat", "org.apache.iceberg.spark.SparkCatalog") + spark.conf.set("spark.sql.catalog.bench_cat.type", "hadoop") + spark.conf.set("spark.sql.catalog.bench_cat.warehouse", warehouseDir.getAbsolutePath) + + val fullTableName = s"bench_cat.db.$tableName" + + spark.sql(s"DROP TABLE IF EXISTS $fullTableName") + spark.sql("CREATE NAMESPACE IF NOT EXISTS bench_cat.db") + + spark.sql(s""" + CREATE TABLE $fullTableName ( + id BIGINT, + name STRING, + value DOUBLE, + partition_col INT + ) USING iceberg + PARTITIONED BY (partition_col) + TBLPROPERTIES ( + 'format-version'='2', + 'write.parquet.compression-codec' = 'snappy' + ) + """) + + // scalastyle:off println + println(s"Creating Iceberg table with $numPartitions partitions...") + // scalastyle:on println + + val batchSize = 1000 + var partitionsCreated = 0 + + while (partitionsCreated < numPartitions) { + val batchEnd = math.min(partitionsCreated + batchSize, numPartitions) + val partitionRange = partitionsCreated until batchEnd + + import spark.implicits._ + val df = partitionRange + .map { p => + (p.toLong, s"name_$p", p * 1.5, p) + } + .toDF("id", "name", "value", "partition_col") + + df.writeTo(fullTableName).append() + partitionsCreated = batchEnd + + if (partitionsCreated % 5000 == 0 || partitionsCreated == numPartitions) { + // scalastyle:off println + println(s" Created $partitionsCreated / $numPartitions partitions") + // scalastyle:on println + } + } + } + + /** + * Benchmarks the serializePartitions() method which does the heavy reflection work. + * + * This is the core method that converts Iceberg FileScanTask Java objects to protobuf. The + * optimizations from PR #3298 target this code path. + */ + def serializePartitionsBenchmark(numPartitions: Int): Unit = { + if (!icebergAvailable) { + // scalastyle:off println + println("Iceberg not available in classpath, skipping benchmark") + // scalastyle:on println + return + } + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.bench_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.bench_cat.type" -> "hadoop", + "spark.sql.catalog.bench_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + createPartitionedIcebergTable(warehouseDir, numPartitions) + val fullTableName = "bench_cat.db.serde_bench_table" + + val df = spark.sql(s"SELECT * FROM $fullTableName") + val plan = df.queryExecution.executedPlan + + val nativeScanOpt = extractIcebergNativeScanExec(plan) + + nativeScanOpt match { + case Some(nativeScan) => + val metadata = nativeScan.nativeIcebergScanMetadata + val originalPlan = nativeScan.originalPlan + val output = nativeScan.output + + // scalastyle:off println + println(s"Found ${metadata.tasks.size()} FileScanTasks") + println(s"Output columns: ${output.map(_.name).mkString(", ")}") + // scalastyle:on println + + val benchmark = new Benchmark( + s"serializePartitions ($numPartitions partitions, ${metadata.tasks.size()} tasks)", + numPartitions, + output = this.output) + + benchmark.addCase("serializePartitions()") { _ => + CometIcebergNativeScan.serializePartitions(originalPlan, output, metadata) + } + + // Measure serialized size + val (commonBytes, perPartitionBytes) = + CometIcebergNativeScan.serializePartitions(originalPlan, output, metadata) + + val totalBytes = commonBytes.length + perPartitionBytes.map(_.length).sum + val commonKB = commonBytes.length / 1024.0 + val perPartKB = perPartitionBytes.map(_.length).sum / 1024.0 + val totalKB = totalBytes / 1024.0 + + // scalastyle:off println + println( + f"Serialized size: common=$commonKB%.1f KB, " + + f"per-partition=$perPartKB%.1f KB, total=$totalKB%.1f KB") + println(f"Average per partition: ${perPartKB / numPartitions * 1024}%.1f bytes") + // scalastyle:on println + + benchmark.run() + + case None => + // scalastyle:off println + println("WARNING: Could not find CometIcebergNativeScanExec in query plan") + println(s"Plan:\n$plan") + // scalastyle:on println + } + + spark.sql(s"DROP TABLE IF EXISTS $fullTableName") + } + } + } + + /** + * Micro-benchmark for reflection operations to isolate their cost. + */ + def reflectionMicroBenchmark(): Unit = { + val numOps = 100000 + val benchmark = new Benchmark("Reflection micro-benchmark", numOps, output = output) + + benchmark.addCase("Class.forName() - uncached") { _ => + Class.forName("org.apache.iceberg.ContentScanTask") + } + + val cachedClass = Class.forName("org.apache.iceberg.ContentScanTask") + benchmark.addCase("Class lookup - cached") { _ => + cachedClass.hashCode() + } + + benchmark.addCase("getMethod() - uncached") { _ => + cachedClass.getMethod("file") + } + + val cachedMethod = cachedClass.getMethod("file") + benchmark.addCase("Method lookup - cached") { _ => + cachedMethod.hashCode() + } + + benchmark.run() + } + + override def runCometBenchmark(args: Array[String]): Unit = { + val numPartitions = if (args.nonEmpty) args(0).toInt else 10000 + + // First show the cost of reflection operations + runBenchmark("Reflection Micro-benchmark") { + reflectionMicroBenchmark() + } + + // Then benchmark the full serialization path + runBenchmark("Iceberg serializePartitions Benchmark") { + serializePartitionsBenchmark(numPartitions) + } + } +}