Skip to content

Iceberg integration - parquet-column version conflicts #1833

@tglanz

Description

@tglanz

Describe the bug

Trying to insert/query from iceberg table according to https://github.com/apache/datafusion-comet/blob/main/docs/source/user-guide/iceberg.md and encountering the following error during insertion:

java.lang.NoSuchMethodError: 'org.apache.parquet.column.ParquetProperties$Builder org.apache.parquet.column.ParquetProperties$Builder.withStatisticsEnabled(java.lang.String, boolean)'

It seems to be an issue with parquet-column version conflicts between 1.15.2 and 1.13.1 and the relocation of the jar:

  • Without the relocation, the insert fails but the select statement works fine because the version is 1.13.1 and has a missing method.
  • With relocation, the insert works but the select statement fails because the shaded type is not part of the function signature.

(Below are the exact details)

Steps to reproduce

Build comet

make release

Apply the following patch to iceberg (as described here):

diff --git a/gradle.properties b/gradle.properties
index 5da56c59d..bd6e35e2f 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -18,7 +18,7 @@ jmhJsonOutputPath=build/reports/jmh/results.json
 jmhIncludeRegex=.*
 systemProp.defaultFlinkVersions=2.0
 systemProp.knownFlinkVersions=1.19,1.20,2.0
-systemProp.defaultSparkVersions=4.0
+systemProp.defaultSparkVersions=3.5
 systemProp.knownSparkVersions=3.4,3.5,4.0
 systemProp.defaultKafkaVersions=3
 systemProp.knownKafkaVersions=3
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 89dd5bf45..79ac91f72 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0"
 bson-ver = "4.11.5"
 caffeine = "2.9.3"
 calcite = "1.39.0"
-comet = "0.8.1"
+comet = "0.9.0-SNAPSHOT"
 datasketches = "6.2.0"
 delta-standalone = "3.3.1"
 delta-spark = "3.3.1"
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index af7a1d74d..426997463 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -301,8 +301,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
     relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
     relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded'
     relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer'
-    relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
-    relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
+    // relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
+    // relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
     relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
     relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
     relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5'
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index a361a7f1b..9021cd5c9 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -24,6 +24,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.comet.parquet.SupportsComet;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
@@ -63,7 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask>
-    implements SupportsRuntimeV2Filtering {
+    implements SupportsRuntimeV2Filtering, SupportsComet {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class);
 
@@ -290,4 +291,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask>
         runtimeFilterExpressions,
         caseSensitive());
   }
+
+  @Override
+  public boolean isCometEnabled() {
+    return true;
+  }
 }

Build iceberg

./gradlew build -x test -x integrationTest

export COMET_JAR and ICEBERG_JAR accordingly.

Run

$SPARK_HOME/bin/spark-shell \
    --jars $COMET_JAR,$ICEBERG_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR,$ICEBERG_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR,$ICEBERG_JAR \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hadoop \
    --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.sql.iceberg.parquet.reader-type=COMET \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=16g

and run the following program:

scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg")
25/06/03 16:13:45 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized
25/06/03 16:13:45 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 CreateTable [COMET: CreateTable is not supported]

res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 100).map(i => (i, i+3)).mkString(",")};")
25/06/03 16:13:59 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 AppendData [COMET: AppendData is not supported]
+-  LocalTableScan [COMET: LocalTableScan is not supported]

25/06/03 16:14:00 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoSuchMethodError: 'org.apache.parquet.column.ParquetProperties$Builder org.apache.parquet.column.ParquetProperties$Builder.withStatisticsEnabled(java.lang.String, boolean)'
        at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:446)
        at org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:866)
        at org.apache.iceberg.data.BaseFileWriterFactory.newDataWriter(BaseFileWriterFactory.java:131)
        at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:52)
        at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:32)
        at org.apache.iceberg.io.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:112)
        at org.apache.iceberg.io.RollingDataWriter.<init>(RollingDataWriter.java:47)
        at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.<init>(SparkWrite.java:717)
        at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:691)
        at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:668)
        at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:441)
        at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Expected behavior

Iceberg insert and select statements should not fail.

Additional context

Some insights:

  1. The missing method org.apache.parquet.column.ParquetProperties$Builder.withStatisticsEnabled is found in parquet-column version 1.15.2 (iceberg's required version) but not in 1.13.1 (spark v3.5 required version).

  2. If we bring back the relocation of the jars the insert will work.

-// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
-// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
+relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
+relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'

From some checks, it seems that by commenting out the relocation, spark loads 1.13.1 of parquet-column and hence the method is missing.

However, assuming there is already data in the table (see next bullet), the select statement works fine:

scala> spark.sql("SELECT * from t1").show()
25/06/03 16:27:51 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized
25/06/03 16:27:51 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 CollectLimit [COMET: CollectLimit is not supported]
+-  Project [COMET: toprettystring is not supported]
   +- CometScanWrapper

+---+---+                                                                       
| c0| c1|
+---+---+
|  0|  3|
|  1|  4|
|  2|  5|
|  3|  6|
|  4|  7|
|  5|  8|
|  6|  9|
|  7| 10|
|  8| 11|
|  9| 12|
| 10| 13|
| 11| 14|
| 12| 15|
| 13| 16|
| 14| 17|
| 15| 18|
| 16| 19|
| 17| 20|
| 18| 21|
| 19| 22|
+---+---+
only showing top 20 rows
  1. Tested with latest releases and on heads

  2. If we keep the relocation, the insert works:

scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 100).map(i => (i, i+3)).mkString(",")};")
25/06/03 16:24:47 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized
25/06/03 16:24:47 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 AppendData [COMET: AppendData is not supported]
+-  LocalTableScan [COMET: LocalTableScan is not supported]

res0: org.apache.spark.sql.DataFrame = []     

However, the select statement now fails with

scala> spark.sql("SELECT * from t1").show()
25/06/03 16:25:27 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 CollectLimit [COMET: CollectLimit is not supported]
+-  Project [COMET: toprettystring is not supported]
   +- CometScanWrapper

25/06/03 16:25:27 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 22)
java.lang.NoSuchMethodError: 'org.apache.comet.parquet.ColumnReader org.apache.comet.parquet.Utils.getColumnReader(org.apache.spark.sql.types.DataType, org.apache.iceberg.shaded.org.apache.parquet.column.ColumnDescriptor, org.apache.comet.CometSchemaImporter, int, boolean, boolean)'
        at org.apache.iceberg.spark.data.vectorized.CometColumnReader.reset(CometColumnReader.java:95)
        at org.apache.iceberg.spark.data.vectorized.CometColumnarBatchReader.setRowGroupInfo(CometColumnarBatchReader.java:81)
        at org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.advance(VectorizedParquetReader.java:166)
        at org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.next(VectorizedParquetReader.java:139)
        at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:129)
        at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
        at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
        at scala.Option.exists(Option.scala:376)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.hasNext(CometBatchScanExec.scala:62)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.cometcolumnartorow_nextBatch_0$(generated.java:30)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:43)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

which from my understanding is caused because the shaded type org.apache.iceberg.shaded.org.apache.parquet.column.ColumnDescriptor is now not part of the org.apache.comet.parquet.Utils.getColumnReader function signature.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions