From cd002330477dc5cd8e816e128500eac7d856d17a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 09:06:52 -0600 Subject: [PATCH 01/17] Add wip iceberg docs --- docs/source/user-guide/datasources.md | 21 +++++--- docs/source/user-guide/iceberg.md | 69 +++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 7 deletions(-) create mode 100644 docs/source/user-guide/iceberg.md diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index 1e5a7df241..2c07673964 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -19,29 +19,36 @@ # Supported Spark Data Sources -## Parquet +## File Formats + +### Parquet When `spark.comet.scan.enabled` is enabled, Parquet scans will be performed natively by Comet if all data types in the schema are supported. When this option is not enabled, the scan will fall back to Spark. In this case, enabling `spark.comet.convert.parquet.enabled` will immediately convert the data into Arrow format, allowing native execution to happen after that, but the process may not be efficient. -## CSV +### CSV Comet does not provide native CSV scan, but when `spark.comet.convert.csv.enabled` is enabled, data is immediately converted into Arrow format, allowing native execution to happen after that. -## JSON +### JSON Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately converted into Arrow format, allowing native execution to happen after that. -# Supported Storages +## Data Catalogs + +### Apache Iceberg + +See + +## Supported Storages -## Local -In progress +Comet supports most standard storage systems, such as local file system and object storage. -## HDFS +### HDFS Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for [supported formats](#supported-spark-data-sources) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md new file mode 100644 index 0000000000..5b2d8be4d3 --- /dev/null +++ b/docs/source/user-guide/iceberg.md @@ -0,0 +1,69 @@ + + +# Accelerating Apache Iceberg Parquet Scans using Comet (Experimental) + +**Note: Iceberg integration is a work-in-progress. It may be necessary to build Iceberg from +source rather than using available artifacts in Maven** + +Download compatible Spark, Comet, and Iceberg versions from Maven, or compile projects from source. + +- [Comet Artifacts](https://central.sonatype.com/artifact/org.apache.datafusion/comet-spark-spark3.5_2.12) +- [Iceberg Artifacts](https://central.sonatype.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.5_2.12/overview) + +Set `COMET_JAR` and `ICEBERG_JAR` environment variables. + +```shell +export COMET_JAR=/path/to/comet-spark-spark3.5_2.12-0.8.0.jar +export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar +``` + +Launch Spark Shell: + +```shell +$SPARK_HOME/bin/spark-shell \ + --jars $COMET_JAR,$ICEBERG_JAR \ + --conf spark.driver.extraClassPath=$COMET_JAR,$ICEBERG_JAR \ + --conf spark.executor.extraClassPath=$COMET_JAR,$ICEBERG_JAR \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.spark_catalog.type=hadoop \ + --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \ + --conf spark.plugins=org.apache.spark.CometPlugin \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.comet.scan.impl=native_iceberg_compat \ + --conf spark.comet.explainFallback.enabled=true \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=16g +``` + +```shell +scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg") +scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}") +scala> spark.sql(s"SELECT * from t1").show() +``` + +**Note: this is not actually accelerating the read** + +``` +25/04/26 08:52:56 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): + CollectLimit [COMET: CollectLimit is not supported] ++- Project [COMET: Project is not native because the following children are not native (BatchScan spark_catalog.default.t1)] + +- BatchScan spark_catalog.default.t1 [COMET: Comet Scan only supports Parquet and Iceberg Parquet file formats, BatchScan spark_catalog.default.t1 is not supported] +``` From 1018fd2516e61c530974ece7104639aa1d6f8176 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 09:09:20 -0600 Subject: [PATCH 02/17] add link --- docs/source/user-guide/datasources.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/datasources.md b/docs/source/user-guide/datasources.md index 2c07673964..ddf02770ef 100644 --- a/docs/source/user-guide/datasources.md +++ b/docs/source/user-guide/datasources.md @@ -42,7 +42,7 @@ converted into Arrow format, allowing native execution to happen after that. ### Apache Iceberg -See +See the dedicated [Comet and Iceberg Guide](iceberg.md). ## Supported Storages From 8640e4f74ae4221b02dcd445ae29cc3eda9926ab Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 09:38:38 -0600 Subject: [PATCH 03/17] link to tracking issue --- .../java/org/apache/comet/parquet/Utils.java | 1 + docs/source/user-guide/iceberg.md | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index f73251e277..36ca0edb81 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -24,6 +24,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.spark.sql.types.*; +//org.apache.comet.shaded.arrow.c.CometSchemaImporter public class Utils { public static ColumnReader getColumnReader( diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 5b2d8be4d3..31f3d0f7c9 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -47,23 +47,29 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \ --conf spark.plugins=org.apache.spark.CometPlugin \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.scan.impl=native_iceberg_compat \ + --conf spark.sql.iceberg.parquet.reader-type=COMET \ --conf spark.comet.explainFallback.enabled=true \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=16g ``` +Create an Iceberg table. Note that Comet will not accelerate this part. + ```shell scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg") scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}") +``` + +Comet should now be able to accelarate reading the table: + +```shell scala> spark.sql(s"SELECT * from t1").show() ``` -**Note: this is not actually accelerating the read** +**Note: this currently fails** ``` -25/04/26 08:52:56 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): - CollectLimit [COMET: CollectLimit is not supported] -+- Project [COMET: Project is not native because the following children are not native (BatchScan spark_catalog.default.t1)] - +- BatchScan spark_catalog.default.t1 [COMET: Comet Scan only supports Parquet and Iceberg Parquet file formats, BatchScan spark_catalog.default.t1 is not supported] +Caused by: java.lang.NoSuchMethodError: 'org.apache.comet.parquet.ColumnReader org.apache.comet.parquet.Utils.getColumnReader(org.apache.spark.sql.types.DataType, org.apache.iceberg.shaded.org.apache.parquet.column.ColumnDescriptor, org.apache.comet.shaded.arrow.c.CometSchemaImporter, int, boolean, boolean)' ``` + +Tracking issue: https://github.com/apache/datafusion-comet/issues/1684 From baae1d2ad0332faf1a15ee1c4960ee92493330d2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 09:40:23 -0600 Subject: [PATCH 04/17] revert code change --- common/src/main/java/org/apache/comet/parquet/Utils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 36ca0edb81..f73251e277 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -24,7 +24,6 @@ import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.spark.sql.types.*; -//org.apache.comet.shaded.arrow.c.CometSchemaImporter public class Utils { public static ColumnReader getColumnReader( From c42b549d3cb7347a228d063e95419cef034b4256 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 10:15:02 -0600 Subject: [PATCH 05/17] fix shading --- common/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/common/pom.xml b/common/pom.xml index 36bc706053..40fefbdcef 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -149,6 +149,7 @@ under the License. + org/apache/arrow/c/Comet** org/apache/arrow/c/jni/JniWrapper org/apache/arrow/c/jni/PrivateData org/apache/arrow/c/jni/CDataJniException From 2d706092255d6a157f5b8ed27d25f735ff86d72e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 10:15:33 -0600 Subject: [PATCH 06/17] fix shading --- common/pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/pom.xml b/common/pom.xml index 40fefbdcef..d09f070dec 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -147,9 +147,10 @@ under the License. org.apache.arrow ${comet.shade.packageName}.arrow + + org/apache/arrow/c/Comet** - org/apache/arrow/c/Comet** org/apache/arrow/c/jni/JniWrapper org/apache/arrow/c/jni/PrivateData org/apache/arrow/c/jni/CDataJniException From 1ea19ad0a08c19636e40a8aaf46e6ba5a90b3e29 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 10:18:40 -0600 Subject: [PATCH 07/17] update --- docs/source/user-guide/iceberg.md | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 31f3d0f7c9..50f26f0c93 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -19,13 +19,25 @@ # Accelerating Apache Iceberg Parquet Scans using Comet (Experimental) -**Note: Iceberg integration is a work-in-progress. It may be necessary to build Iceberg from +**Note: Iceberg integration is a work-in-progress. It is currently necessary to build Iceberg from source rather than using available artifacts in Maven** -Download compatible Spark, Comet, and Iceberg versions from Maven, or compile projects from source. +## Build Comet -- [Comet Artifacts](https://central.sonatype.com/artifact/org.apache.datafusion/comet-spark-spark3.5_2.12) -- [Iceberg Artifacts](https://central.sonatype.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.5_2.12/overview) +```shell +make release +mvn install -DskipTests +``` + +## Build Iceberg + +Update Gradle files to change Comet version to `0.9.0-SNAPSHOT`. + +```shell +./gradlew build +``` + +## Test Set `COMET_JAR` and `ICEBERG_JAR` environment variables. @@ -65,11 +77,3 @@ Comet should now be able to accelarate reading the table: ```shell scala> spark.sql(s"SELECT * from t1").show() ``` - -**Note: this currently fails** - -``` -Caused by: java.lang.NoSuchMethodError: 'org.apache.comet.parquet.ColumnReader org.apache.comet.parquet.Utils.getColumnReader(org.apache.spark.sql.types.DataType, org.apache.iceberg.shaded.org.apache.parquet.column.ColumnDescriptor, org.apache.comet.shaded.arrow.c.CometSchemaImporter, int, boolean, boolean)' -``` - -Tracking issue: https://github.com/apache/datafusion-comet/issues/1684 From 6c37efc5ea66f2ff902ee319439a01771b403ddc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 10:19:08 -0600 Subject: [PATCH 08/17] update --- docs/source/user-guide/iceberg.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 50f26f0c93..7470076f46 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -42,7 +42,7 @@ Update Gradle files to change Comet version to `0.9.0-SNAPSHOT`. Set `COMET_JAR` and `ICEBERG_JAR` environment variables. ```shell -export COMET_JAR=/path/to/comet-spark-spark3.5_2.12-0.8.0.jar +export COMET_JAR=/path/to/comet-spark-spark3.5_2.12-0.9.0-SNAPSHOT.jar export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar ``` From b03dd4cacbf2f48a12fdab0819aab8b89bcd2e6a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 11:47:30 -0600 Subject: [PATCH 09/17] update --- docs/source/user-guide/iceberg.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 7470076f46..5301d60797 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -31,9 +31,21 @@ mvn install -DskipTests ## Build Iceberg -Update Gradle files to change Comet version to `0.9.0-SNAPSHOT`. +Clone the Iceberg repository. ```shell +git clone git@github.com:apache/iceberg.git +``` + +It will be necessary to make some small changes to Iceberg: + +- Update Gradle files to change Comet version to `0.9.0-SNAPSHOT`. +- Replace `import org.apache.comet.sharded.arrow.c.CometSchemaImporter;` with `import org.apache.arrow.c.CometSchemaImporter;` + +Perform a clean build + +```shell +./gradlew clean ./gradlew build ``` From 46512110ea6c015fc17a8a8532d75deb7c54f515 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 15:15:47 -0600 Subject: [PATCH 10/17] refactor to avoid shading --- ....java => AbstractCometSchemaImporter.java} | 4 +-- .../org/apache/comet/CometSchemaImporter.java | 30 +++++++++++++++++++ .../org/apache/comet/parquet/BatchReader.java | 2 +- .../apache/comet/parquet/ColumnReader.java | 2 +- .../comet/parquet/LazyColumnReader.java | 2 +- .../comet/parquet/NativeBatchReader.java | 2 +- .../comet/parquet/NativeColumnReader.java | 2 +- .../java/org/apache/comet/parquet/Utils.java | 3 +- docs/source/user-guide/iceberg.md | 2 +- 9 files changed, 40 insertions(+), 9 deletions(-) rename common/src/main/java/org/apache/arrow/c/{CometSchemaImporter.java => AbstractCometSchemaImporter.java} (95%) create mode 100644 common/src/main/java/org/apache/comet/CometSchemaImporter.java diff --git a/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java b/common/src/main/java/org/apache/arrow/c/AbstractCometSchemaImporter.java similarity index 95% rename from common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java rename to common/src/main/java/org/apache/arrow/c/AbstractCometSchemaImporter.java index 32955f1acb..f35196356f 100644 --- a/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java +++ b/common/src/main/java/org/apache/arrow/c/AbstractCometSchemaImporter.java @@ -24,12 +24,12 @@ import org.apache.arrow.vector.types.pojo.Field; /** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */ -public class CometSchemaImporter { +public abstract class AbstractCometSchemaImporter { private final BufferAllocator allocator; private final SchemaImporter importer; private final CDataDictionaryProvider provider = new CDataDictionaryProvider(); - public CometSchemaImporter(BufferAllocator allocator) { + public AbstractCometSchemaImporter(BufferAllocator allocator) { this.allocator = allocator; this.importer = new SchemaImporter(allocator); } diff --git a/common/src/main/java/org/apache/comet/CometSchemaImporter.java b/common/src/main/java/org/apache/comet/CometSchemaImporter.java new file mode 100644 index 0000000000..7dc4b75db6 --- /dev/null +++ b/common/src/main/java/org/apache/comet/CometSchemaImporter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet; + +import org.apache.arrow.c.*; +import org.apache.arrow.memory.BufferAllocator; + +/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */ +public class CometSchemaImporter extends AbstractCometSchemaImporter { + public CometSchemaImporter(BufferAllocator allocator) { + super(allocator); + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 8bcbcc0d0c..663406d0a9 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.arrow.c.CometSchemaImporter; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.commons.lang3.tuple.Pair; @@ -65,6 +64,7 @@ import org.apache.spark.util.AccumulatorV2; import org.apache.comet.CometConf; +import org.apache.comet.CometSchemaImporter; import org.apache.comet.shims.ShimBatchReader; import org.apache.comet.shims.ShimFileFormat; import org.apache.comet.vector.CometVector; diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 1927868a1c..9502aa265d 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -27,7 +27,6 @@ import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; -import org.apache.arrow.c.CometSchemaImporter; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; @@ -44,6 +43,7 @@ import org.apache.spark.sql.types.DataType; import org.apache.comet.CometConf; +import org.apache.comet.CometSchemaImporter; import org.apache.comet.vector.CometDecodedVector; import org.apache.comet.vector.CometDictionary; import org.apache.comet.vector.CometDictionaryVector; diff --git a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java index dd08a88ab5..b22278ea78 100644 --- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java @@ -21,11 +21,11 @@ import java.io.IOException; -import org.apache.arrow.c.CometSchemaImporter; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; import org.apache.spark.sql.types.DataType; +import org.apache.comet.CometSchemaImporter; import org.apache.comet.vector.CometLazyVector; import org.apache.comet.vector.CometVector; diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index 45fce914f3..466bba4fd7 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -36,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.arrow.c.CometSchemaImporter; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.ipc.WriteChannel; @@ -71,6 +70,7 @@ import org.apache.spark.util.AccumulatorV2; import org.apache.comet.CometConf; +import org.apache.comet.CometSchemaImporter; import org.apache.comet.shims.ShimBatchReader; import org.apache.comet.shims.ShimFileFormat; import org.apache.comet.vector.CometVector; diff --git a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java index c358999a54..50838ef771 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java @@ -24,13 +24,13 @@ import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; -import org.apache.arrow.c.CometSchemaImporter; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.Type; import org.apache.spark.sql.types.DataType; +import org.apache.comet.CometSchemaImporter; import org.apache.comet.vector.*; // TODO: extend ColumnReader instead of AbstractColumnReader to reduce code duplication diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index f73251e277..c67ef1d0b4 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -19,12 +19,13 @@ package org.apache.comet.parquet; -import org.apache.arrow.c.CometSchemaImporter; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.spark.sql.types.*; +import org.apache.comet.CometSchemaImporter; + public class Utils { public static ColumnReader getColumnReader( DataType type, diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 5301d60797..2951b9c105 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -40,7 +40,7 @@ git clone git@github.com:apache/iceberg.git It will be necessary to make some small changes to Iceberg: - Update Gradle files to change Comet version to `0.9.0-SNAPSHOT`. -- Replace `import org.apache.comet.sharded.arrow.c.CometSchemaImporter;` with `import org.apache.arrow.c.CometSchemaImporter;` +- Replace `import org.apache.comet.shaded.arrow.c.CometSchemaImporter;` with `import org.apache.comet.CometSchemaImporter;` Perform a clean build From 9fb4ec60e83b83adbe6830ed821538b1a74d3819 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 26 Apr 2025 15:16:03 -0600 Subject: [PATCH 11/17] refactor to avoid shading --- docs/source/user-guide/iceberg.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 2951b9c105..9c2cef00f2 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -84,7 +84,7 @@ scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING icebe scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}") ``` -Comet should now be able to accelarate reading the table: +Comet should now be able to accelerate reading the table: ```shell scala> spark.sql(s"SELECT * from t1").show() From 922f711b91450b278aef0d8a9bb27a1250cc3706 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 28 Apr 2025 06:13:40 -0600 Subject: [PATCH 12/17] avoid Parquet types in signature --- .../java/org/apache/comet/parquet/Utils.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index c67ef1d0b4..3a83eb6394 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -27,13 +27,39 @@ import org.apache.comet.CometSchemaImporter; public class Utils { + + /** This method is called from Apache Iceberg. */ public static ColumnReader getColumnReader( DataType type, - ColumnDescriptor descriptor, CometSchemaImporter importer, + String[] path, + String typeName, + int maxRep, + int maxDef, int batchSize, boolean useDecimal128, boolean useLazyMaterialization) { + PrimitiveType.PrimitiveTypeName primitiveTypeName = null; + if (typeName.equalsIgnoreCase("FLOAT")) { + primitiveTypeName = PrimitiveType.PrimitiveTypeName.FLOAT; + } else if (typeName.equalsIgnoreCase("DOUBLE")) { + primitiveTypeName = PrimitiveType.PrimitiveTypeName.DOUBLE; + } else if (typeName.equalsIgnoreCase("INT32")) { + primitiveTypeName = PrimitiveType.PrimitiveTypeName.INT32; + } else if (typeName.equalsIgnoreCase("INT64")) { + primitiveTypeName = PrimitiveType.PrimitiveTypeName.INT64; + } else if (typeName.equalsIgnoreCase("INT96")) { + primitiveTypeName = PrimitiveType.PrimitiveTypeName.INT96; + } else if (typeName.equalsIgnoreCase("BOOLEAN")) { + primitiveTypeName = PrimitiveType.PrimitiveTypeName.BOOLEAN; + } else if (typeName.equalsIgnoreCase("BINARY")) { + primitiveTypeName = PrimitiveType.PrimitiveTypeName.BINARY; + } else if (typeName.equalsIgnoreCase("FIXED_LEN_BYTE_ARRAY")) { + primitiveTypeName = PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; + } else { + throw new IllegalArgumentException("Unsupported column type: " + typeName); + } + ColumnDescriptor descriptor = new ColumnDescriptor(path, primitiveTypeName, maxRep, maxDef); // TODO: support `useLegacyDateTimestamp` for Iceberg return getColumnReader( type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); From 0ab6a432f2d6630d75deaee006b7d66d35b1142c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 28 Apr 2025 06:53:18 -0600 Subject: [PATCH 13/17] experimenting --- .../java/org/apache/comet/parquet/Utils.java | 26 +------------------ docs/source/user-guide/iceberg.md | 6 +++++ 2 files changed, 7 insertions(+), 25 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 3a83eb6394..2f9c507366 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -31,35 +31,11 @@ public class Utils { /** This method is called from Apache Iceberg. */ public static ColumnReader getColumnReader( DataType type, + ColumnDescriptor descriptor, CometSchemaImporter importer, - String[] path, - String typeName, - int maxRep, - int maxDef, int batchSize, boolean useDecimal128, boolean useLazyMaterialization) { - PrimitiveType.PrimitiveTypeName primitiveTypeName = null; - if (typeName.equalsIgnoreCase("FLOAT")) { - primitiveTypeName = PrimitiveType.PrimitiveTypeName.FLOAT; - } else if (typeName.equalsIgnoreCase("DOUBLE")) { - primitiveTypeName = PrimitiveType.PrimitiveTypeName.DOUBLE; - } else if (typeName.equalsIgnoreCase("INT32")) { - primitiveTypeName = PrimitiveType.PrimitiveTypeName.INT32; - } else if (typeName.equalsIgnoreCase("INT64")) { - primitiveTypeName = PrimitiveType.PrimitiveTypeName.INT64; - } else if (typeName.equalsIgnoreCase("INT96")) { - primitiveTypeName = PrimitiveType.PrimitiveTypeName.INT96; - } else if (typeName.equalsIgnoreCase("BOOLEAN")) { - primitiveTypeName = PrimitiveType.PrimitiveTypeName.BOOLEAN; - } else if (typeName.equalsIgnoreCase("BINARY")) { - primitiveTypeName = PrimitiveType.PrimitiveTypeName.BINARY; - } else if (typeName.equalsIgnoreCase("FIXED_LEN_BYTE_ARRAY")) { - primitiveTypeName = PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; - } else { - throw new IllegalArgumentException("Unsupported column type: " + typeName); - } - ColumnDescriptor descriptor = new ColumnDescriptor(path, primitiveTypeName, maxRep, maxDef); // TODO: support `useLegacyDateTimestamp` for Iceberg return getColumnReader( type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 9c2cef00f2..5880e6a4af 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -41,6 +41,12 @@ It will be necessary to make some small changes to Iceberg: - Update Gradle files to change Comet version to `0.9.0-SNAPSHOT`. - Replace `import org.apache.comet.shaded.arrow.c.CometSchemaImporter;` with `import org.apache.comet.CometSchemaImporter;` +- Stop shading Parquet by commenting out the following lines in the iceberg-spark build: + +``` +// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' +// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' +``` Perform a clean build From 5851aaaeb2e5007ca6049aa9a540bdf60ba48a55 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 28 Apr 2025 06:59:50 -0600 Subject: [PATCH 14/17] update docs --- docs/source/user-guide/iceberg.md | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 5880e6a4af..90d978b1c5 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -24,11 +24,24 @@ source rather than using available artifacts in Maven** ## Build Comet +Run a Maven install so that we can compile Iceberg against latest Comet: + ```shell -make release mvn install -DskipTests ``` +Build the release JAR to be used from Spark: + +```shell +make release +``` + +Set `COMET_JAR` env var: + +```shell +export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-0.9.0-SNAPSHOT.jar +``` + ## Build Iceberg Clone the Iceberg repository. @@ -52,16 +65,15 @@ Perform a clean build ```shell ./gradlew clean -./gradlew build +./gradlew build -x test -x integrationTest ``` ## Test -Set `COMET_JAR` and `ICEBERG_JAR` environment variables. +Set `ICEBERG_JAR` environment variable. ```shell -export COMET_JAR=/path/to/comet-spark-spark3.5_2.12-0.9.0-SNAPSHOT.jar -export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar +export ICEBERG_JAR=`pwd`/spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.10.0-SNAPSHOT.jar ``` Launch Spark Shell: From 58d97856a735f5018b1fe1469861ea2f8db14358 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 28 Apr 2025 07:30:07 -0600 Subject: [PATCH 15/17] update docs --- .../main/java/org/apache/comet/parquet/SupportsComet.java | 1 + docs/source/user-guide/iceberg.md | 1 + .../org/apache/comet/CometSparkSessionExtensions.scala | 7 +++++-- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/SupportsComet.java b/common/src/main/java/org/apache/comet/parquet/SupportsComet.java index f330a76819..b95c31ce92 100644 --- a/common/src/main/java/org/apache/comet/parquet/SupportsComet.java +++ b/common/src/main/java/org/apache/comet/parquet/SupportsComet.java @@ -19,6 +19,7 @@ package org.apache.comet.parquet; +/** This is implemented in Apache Iceberg */ public interface SupportsComet { boolean isCometEnabled(); } diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 90d978b1c5..b46792f3eb 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -54,6 +54,7 @@ It will be necessary to make some small changes to Iceberg: - Update Gradle files to change Comet version to `0.9.0-SNAPSHOT`. - Replace `import org.apache.comet.shaded.arrow.c.CometSchemaImporter;` with `import org.apache.comet.CometSchemaImporter;` +- Modify `SparkBatchQueryScan` so that it implements the `SupportsComet` interface - Stop shading Parquet by commenting out the following lines in the iceberg-spark build: ``` diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 1f969b3f4b..c2297ab7e6 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -230,8 +230,11 @@ class CometSparkSessionExtensions withInfo(scanExec, fallbackReasons.mkString(", ")) } - case _ => - withInfo(scanExec, "Comet Scan only supports Parquet and Iceberg Parquet file formats") + case other => + withInfo( + scanExec, + s"Unsupported scan: ${other.getClass.getName}. " + + "Comet Scan only supports Parquet and Iceberg Parquet file formats") } } From ba4c073b80f81f970acb0df4a7973d7615381a89 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 28 Apr 2025 07:33:11 -0600 Subject: [PATCH 16/17] docs --- docs/source/user-guide/iceberg.md | 45 ++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index b46792f3eb..23293d6c46 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -53,10 +53,10 @@ git clone git@github.com:apache/iceberg.git It will be necessary to make some small changes to Iceberg: - Update Gradle files to change Comet version to `0.9.0-SNAPSHOT`. -- Replace `import org.apache.comet.shaded.arrow.c.CometSchemaImporter;` with `import org.apache.comet.CometSchemaImporter;` +- Replace `import org.apache.comet.shaded.arrow.c.CometSchemaImporter;` with `import org.apache.comet.CometSchemaImporter;` - Modify `SparkBatchQueryScan` so that it implements the `SupportsComet` interface - Stop shading Parquet by commenting out the following lines in the iceberg-spark build: - + ``` // relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' // relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' @@ -98,13 +98,50 @@ $SPARK_HOME/bin/spark-shell \ Create an Iceberg table. Note that Comet will not accelerate this part. -```shell +``` scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg") scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}") ``` Comet should now be able to accelerate reading the table: -```shell +``` +scala> spark.sql(s"SELECT * from t1").show() +``` + +This should produce the following output: + +``` scala> spark.sql(s"SELECT * from t1").show() +25/04/28 07:29:37 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized +25/04/28 07:29:37 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): + CollectLimit [COMET: CollectLimit is not supported] ++- Project [COMET: toprettystring is not supported] + +- CometScanWrapper + ++---+---+ +| c0| c1| ++---+---+ +| 0| 0| +| 1| 1| +| 2| 2| +| 3| 3| +| 4| 4| +| 5| 5| +| 6| 6| +| 7| 7| +| 8| 8| +| 9| 9| +| 10| 10| +| 11| 11| +| 12| 12| +| 13| 13| +| 14| 14| +| 15| 15| +| 16| 16| +| 17| 17| +| 18| 18| +| 19| 19| ++---+---+ +only showing top 20 rows ``` From 3bde99444bbc3086d582585c5dada4bc8fa2a7f0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 28 Apr 2025 07:54:37 -0600 Subject: [PATCH 17/17] revert shading change --- common/pom.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/common/pom.xml b/common/pom.xml index d09f070dec..36bc706053 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -147,8 +147,6 @@ under the License. org.apache.arrow ${comet.shade.packageName}.arrow - - org/apache/arrow/c/Comet** org/apache/arrow/c/jni/JniWrapper