Skip to content

feat: Encapsulate Parquet objects#1920

Merged
andygrove merged 6 commits intoapache:mainfrom
huaxingao:parquet_encap
Jun 27, 2025
Merged

feat: Encapsulate Parquet objects#1920
andygrove merged 6 commits intoapache:mainfrom
huaxingao:parquet_encap

Conversation

@huaxingao
Copy link
Copy Markdown
Contributor

@huaxingao huaxingao commented Jun 20, 2025

Which issue does this PR close?

Closes #1833

Rationale for this change

Iceberg shades Parquet. We can't pass Parquet objects from Iceberg to Comet. In order to get around this problem, this PR encapsulates the Parquet objects.

Here is the summary of the changes:
Iceberg call these APIs:

public static ColumnReader getColumnReader(
   DataType type,
   ColumnDescriptor descriptor,
   CometSchemaImporter importer,
   int batchSize,
   boolean useDecimal128,
   boolean useLazyMaterialization) 

ColumnReader.setPageReader(PageReader pageReader)

In order to encapsulate ColumnDescriptor and PageReader, will add a ParquetColumnSpec, change the above two APIs to

public static ColumnReader getColumnReader(
   DataType type,
   ParquetColumnSpec columnSpec,
   CometSchemaImporter importer,
   int batchSize,
   boolean useDecimal128,
   boolean useLazyMaterialization)
// construct a ColumnDescriptor from ParquetColumnSpec

setRowGroupReader(org.apache.comet.parquet.RowGroupReader rowGroupReader, ParquetColumnSpec columnSpec)
// Will call PageReader pageReader = RowGroupReader.getPageReader(ColumnDescriptor)
// setPageReader(pageReader);

In order to call setRowGroupReader(org.apache.comet.parquet.RowGroupReader rowGroupReader, ParquetColumnSpec columnSpec), in Iceberg side, will need to use Comet's FileReader instead of ParquetFileReader, so we will call FileReader.readNextRowGroup() to get a org.apache.comet.parquet.RowGroupReader instead Parquet's PageReadStore.

ParquetReadOption can't be passed directly either, so the related info are passed and ParquetReadOption is built on Comet.

What changes are included in this PR?

How are these changes tested?

I did integration test on my local.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 20, 2025

Codecov Report

❌ Patch coverage is 4.59770% with 83 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.11%. Comparing base (f09f8af) to head (ea08f9a).
⚠️ Report is 1150 commits behind head on main.

Files with missing lines Patch % Lines
...main/java/org/apache/comet/parquet/FileReader.java 1.96% 50 Missing ⚠️
.../src/main/java/org/apache/comet/parquet/Utils.java 0.00% 15 Missing ⚠️
...va/org/apache/comet/parquet/ParquetColumnSpec.java 0.00% 14 Missing ⚠️
...in/java/org/apache/comet/parquet/ColumnReader.java 0.00% 3 Missing ⚠️
.../java/org/apache/comet/parquet/RowGroupReader.java 75.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1920      +/-   ##
============================================
+ Coverage     56.12%   58.11%   +1.98%     
- Complexity      976     1141     +165     
============================================
  Files           119      132      +13     
  Lines         11743    12987    +1244     
  Branches       2251     2407     +156     
============================================
+ Hits           6591     7547     +956     
- Misses         4012     4217     +205     
- Partials       1140     1223      +83     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@huaxingao huaxingao closed this Jun 20, 2025
@huaxingao huaxingao reopened this Jun 20, 2025
@huaxingao huaxingao force-pushed the parquet_encap branch 2 times, most recently from b0ca78f to 36b4890 Compare June 23, 2025 17:15
Comment on lines +129 to +130
/** This method is called from Apache Iceberg. */
public void setRowGroupReader(RowGroupReader rowGroupReader, ParquetColumnSpec columnSpec)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have some unit tests in Comet for the methods intended to be called from Iceberg, so that we catch any regressions in behavior. This could be added as a separate PR so that we don't slow down progress on the integration work.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #1928

@huaxingao
Copy link
Copy Markdown
Contributor Author

I have a draft iceberg PR

@huaxingao
Copy link
Copy Markdown
Contributor Author

cc @andygrove @parthchandra @hsiang-c
Could you please review this PR? Thanks a lot!

@andygrove
Copy link
Copy Markdown
Member

andygrove commented Jun 25, 2025

I am testing this locally now. There is still one API call that references a Parquet class, causing Iceberg to fail to compile:

edit: There seems to be a method missing in Comet in this PR:

/home/andy/git/apache/iceberg/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java:222: error: cannot find symbol
        fileReader.setRequestedSchemaFromSpecs(specs);

@andygrove
Copy link
Copy Markdown
Member

I added the following method to FileReader locally:

  /** Sets the projected columns to be read later via {@link #readNextRowGroup()} */
  public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> projection) {
    paths.clear();
    for (ParquetColumnSpec columnSpec : projection) {
      ColumnDescriptor col = Utils.buildColumnDescriptor(columnSpec);
      paths.put(ColumnPath.get(col.getPath()), col);
    }
  }

I can now compile Iceberg, but I get an exception at runtime, and I do not yet understand why:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.3
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.27)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg")
25/06/25 08:12:29 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized
25/06/25 08:12:29 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 CreateTable [COMET: CreateTable is not supported]

res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}")
25/06/25 08:12:32 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 AppendData [COMET: AppendData is not supported]
+-  LocalTableScan [COMET: LocalTableScan is not supported]

res1: org.apache.spark.sql.DataFrame = []                                       

scala> spark.sql(s"SELECT * from t1").show()
25/06/25 08:12:35 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 CollectLimit [COMET: CollectLimit is not supported]
+- Project
   +-  BatchScan spark_catalog.default.t1 [COMET: Unsupported scan: org.apache.iceberg.spark.source.SparkBatchQueryScan. Comet Scan only supports Parquet and Iceberg Parquet file formats, BatchScan spark_catalog.default.t1 is not supported]

25/06/25 08:12:35 WARN CheckAllocator: More than one DefaultAllocationManager on classpath. Choosing first found
25/06/25 08:12:35 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 32)
java.lang.NoSuchMethodError: 'void org.apache.comet.parquet.FileReader.setRequestedSchemaFromSpecs(java.util.List)'
	at org.apache.iceberg.parquet.CometVectorizedParquetReader$FileIterator.newCometReader(CometVectorizedParquetReader.java:222)

@andygrove
Copy link
Copy Markdown
Member

andygrove commented Jun 25, 2025

The NoSuchMethodError error was my mistake. I was building Comet for Spark 3.4 but using the jar for Spark 3.5 when testing. I no longer see any errors, but Comet is not accelerating the Iceberg scan:

COMET: Unsupported scan: org.apache.iceberg.spark.source.SparkBatchQueryScan. Comet Scan only supports Parquet and Iceberg Parquet file formats, BatchScan spark_catalog.default.t1 is not supported

Comet is expecting a scan that implements SupportsComet:

      // Iceberg scan
      case s: SupportsComet =>

Iceberg does not implement this interface, and I see that there is a related TODO comment:

   * <p>TODO: Implement {@link org.apache.comet.parquet.SupportsComet} in SparkScan to convert Spark
   * physical plan to native physical plan for native execution.

@andygrove
Copy link
Copy Markdown
Member

In my local copy of Iceberg, I updated SparkBatchQueryScan to implement SupportsComet. When using both the Comet and Iceberg jars on the classpath, Comet is unable to recognize that SparkBatchQueryScan implements SupportsComet, and I think this is because there are two copies of the SupportsComet interface (one in the Comet jar and one in the Iceberg jar).

If I just have the Iceberg jar on the classpath then Comet does try and accelerate the scan, but now I am running into Arrow shading issues.

Spark 3.4.3 uses Arrow 11.0.0
Comet uses Arrow 18.3.0
Iceberg uses Arrow 15.0.2

It is not possible to shade the JNI classes in Arrow because the Java names have to match the function names in the native code, so this is quite challenging to resolve. We may need to look into using class loader isolation.

@snmvaughan
Copy link
Copy Markdown
Contributor

I'm surprised we don't have a Comet interface which provides the access needed by Comet, in combination with an Iceberg implementation of that interface that wraps the Iceberg details.

Comment thread common/pom.xml Outdated
<groupId>org.apache.datafusion</groupId>
<artifactId>comet-parent-spark${spark.version.short}_${scala.binary.version}</artifactId>
<version>0.9.0-SNAPSHOT</version>
<version>0.9.0.1-SNAPSHOT</version>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao I'm assuming that the version bump was not intentional?

@andygrove
Copy link
Copy Markdown
Member

I am now able to get this working end-to-end 🎉

Copy link
Copy Markdown
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I am now able to integrate Comet and Iceberg locally. Thanks @huaxingao!

@hsiang-c
Copy link
Copy Markdown
Contributor

In my local copy of Iceberg, I updated SparkBatchQueryScan to implement SupportsComet.

@andygrove You can apply the diff to Iceberg 1.8.1 for Comet support. I'll add Iceberg 1.9.1 diff soon.

https://github.com/apache/datafusion-comet/blob/main/dev/diffs/iceberg/1.8.1.diff#L236

ParquetReadOptions options =
buildParquetReadOptions(conf, properties, start, length, fileEncryptionKey, fileAADPrefix);
this.converter = new ParquetMetadataConverter(options);
this.file = HadoopInputFile.fromPath(path, conf);
Copy link
Copy Markdown
Contributor

@hsiang-c hsiang-c Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use CometInputFile here? According to the doc

/**
 * A Parquet {@link InputFile} implementation that's similar to {@link
 * org.apache.parquet.hadoop.util.HadoopInputFile}, but with optimizations introduced in Hadoop 3.x,
 * for S3 specifically.
 */
public class CometInputFile implements InputFile {
    // omitted
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use CometInputFile. Modified.


Collection<String> readPropertiesToRemove =
Sets.newHashSet(
"parquet.read.filter",
Copy link
Copy Markdown
Contributor

@hsiang-c hsiang-c Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use these constants since they are not in the method signature?

ParquetInputFormat.UNBOUND_RECORD_FILTER,
ParquetInputFormat.FILTER_PREDICATE,
ParquetInputFormat.READ_SUPPORT_CLASS,
EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

@andygrove
Copy link
Copy Markdown
Member

In my local copy of Iceberg, I updated SparkBatchQueryScan to implement SupportsComet. When using both the Comet and Iceberg jars on the classpath, Comet is unable to recognize that SparkBatchQueryScan implements SupportsComet, and I think this is because there are two copies of the SupportsComet interface (one in the Comet jar and one in the Iceberg jar).

If I just have the Iceberg jar on the classpath then Comet does try and accelerate the scan, but now I am running into Arrow shading issues.

Spark 3.4.3 uses Arrow 11.0.0 Comet uses Arrow 18.3.0 Iceberg uses Arrow 15.0.2

It is not possible to shade the JNI classes in Arrow because the Java names have to match the function names in the native code, so this is quite challenging to resolve. We may need to look into using class loader isolation.

I filed a follow on issue for this - #1934

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.compress.utils.Sets;
Copy link
Copy Markdown
Contributor

@hsiang-c hsiang-c Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Perhaps we could use built-in Set.

Suggested change
import org.apache.commons.compress.utils.Sets;
import java.util.Set;

Iceberg prefers import org.apache.iceberg.relocated.com.google.common.collect.Sets; but Comet doesn't use it.

// Iceberg remove these read properties when building the ParquetReadOptions.
// We want build the exact same ParquetReadOptions as Iceberg's.
Collection<String> readPropertiesToRemove =
Sets.newHashSet(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Sets.newHashSet(
Set.of(

Copy link
Copy Markdown
Contributor

@hsiang-c hsiang-c left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks Huaxin.

@parthchandra
Copy link
Copy Markdown
Contributor

Perhaps we could simplify the setPageReader conflict considerably by implementing this in org.apache.come.parquet.ColumnReader

  public void setPageReader(Object pageReader) throws IOException {
    if (!(pageReader instanceof PageReader)) {
      throw  new IllegalArgumentException("pageReader is not of type PageReader")
    }
    setPageReader((PageReader)pageReader);
  }

This shifts the burden of type checking to runtime instead of compile time but sidesteps the shading issue.

We will not need to make any changes related to calling setRowGroupReader

@parthchandra
Copy link
Copy Markdown
Contributor

The same tactic might work for getColumnReader too.

@parthchandra
Copy link
Copy Markdown
Contributor

On second thoughts, the check pageReader instanceof PageReader is never going to pass because the object passed in will be an Iceberg shaded PageReader which is not the same thing. So the shading issue will not really be avoided.

@parthchandra
Copy link
Copy Markdown
Contributor

OK. Verified that this along with the corresponding PR in Iceberg works correctly

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.3
      /_/

Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.19)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg")
25/06/27 09:46:48 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized
25/06/27 09:46:48 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 CreateTable [COMET: CreateTable is not supported]

res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}")
25/06/27 09:46:53 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 AppendData [COMET: AppendData is not supported]
+-  LocalTableScan [COMET: LocalTableScan is not supported]

res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql(s"select * from t1").show()
25/06/27 09:47:04 WARN CheckAllocator: More than one DefaultAllocationManager on classpath. Choosing first found
+---+---+
| c0| c1|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  5|  5|
|  6|  6|
|  7|  7|
|  8|  8|
|  9|  9|
| 10| 10|
| 11| 11|
| 12| 12|
| 13| 13|
| 14| 14|
| 15| 15|
| 16| 16|
| 17| 17|
| 18| 18|
| 19| 19|
+---+---+
only showing top 20 rows


scala>

Copy link
Copy Markdown
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to be merged.
I've one question about potentially reading the footer twice which can be expensive. Can we verify we are not reading it multiple times?

this.cometOptions = cometOptions;
this.metrics = null;
try {
this.footer = readFooter(file, options, f, converter);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The footer should be available already in Iceberg? Can we avoid having to read it twice?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the footer is already available from iceberg, but I can't pass ParquetMetadata from Iceberg to Comet. Maybe I can convert ParquetMetadata to JSON string and pass it to Comet, and then construct a ParquetMetadata from JSON string.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought I can do something like this:

       ParquetMetadata footer = reader.getFooter();
       String footerInJson = ParquetMetadata.toJSON(footer);
       ParquetMetadata footer2 = ParquetMetadata.fromJSON(footerInJson);

but this doesn't work. I got

java.lang.RuntimeException: shaded.parquet.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.apache.parquet.hadoop.metadata.ParquetMetadata` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (StringReader); line: 1, column: 2]

I will see if there are other ways to re-construct ParquetMetadata.

@andygrove
Copy link
Copy Markdown
Member

@huaxingao Could you merge latest from main to fix the clippy issues?

@parthchandra
Copy link
Copy Markdown
Contributor

Wrote up some notes on this -
https://docs.google.com/document/d/1Jve1RZ5rJD7i83x3PpzI8t3FojcqVM3pJqRCVLbgVL8/edit?tab=t.0

@andygrove andygrove merged commit ded4022 into apache:main Jun 27, 2025
97 of 98 checks passed
@huaxingao
Copy link
Copy Markdown
Contributor Author

Thanks @andygrove @hsiang-c @parthchandra

@huaxingao huaxingao deleted the parquet_encap branch June 27, 2025 21:28
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Iceberg integration - parquet-column version conflicts

6 participants