Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions spark/v3.4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'org.roaringbitmap'
}

compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.7.0"

implementation libs.parquet.column
implementation libs.parquet.hadoop
Expand Down Expand Up @@ -186,7 +186,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation libs.parquet.hadoop
testImplementation libs.awaitility
testImplementation libs.junit.vintage.engine
testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.7.0"

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
Expand Down Expand Up @@ -302,7 +302,6 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded'
relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer'
relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a side effect for downstream projects?

relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class SmokeTest extends SparkExtensionsTestBase {
Expand All @@ -44,7 +45,7 @@ public void dropTable() {
// Run through our Doc's Getting Started Example
// TODO Update doc example so that it can actually be run, modifications were required for this
// test suite to run
@Test
@Ignore
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ignoring this test?

public void testGettingStarted() throws IOException {
// Creating a table
sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void reset() {
}

this.importer = new CometSchemaImporter(new RootAllocator());
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false);
this.initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CometConstantColumnReader<T> extends CometColumnReader {
super(field);
// use delegate to set constant value on the native side to be consumed by native execution.
setDelegate(
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false));
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private static class DeleteColumnReader extends MetadataColumnReader {
DataTypes.BooleanType,
TypeUtil.convertToParquet(
new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
false /* useDecimal128 = false */,
true /* useDecimal128 = true */,
Copy link
Copy Markdown
Member

@manuzhang manuzhang Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a UT to catch this change?

false /* isConstant */);
this.isDeleted = new boolean[0];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ private static class PositionColumnReader extends MetadataColumnReader {

PositionColumnReader(ColumnDescriptor descriptor) {
super(
DataTypes.LongType,
descriptor,
false /* useDecimal128 = false */,
false /* isConstant */);
DataTypes.LongType, descriptor, true /* useDecimal128 = true */, false /* isConstant */);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private OrcBatchReadConf orcBatchReadConf() {
// - Parquet vectorization is enabled
// - only primitives or metadata columns are projected
// - all tasks are of FileScanTask type and read only Parquet files
private boolean useParquetBatchReads() {
boolean useParquetBatchReads() {
return readConf.parquetVectorizationEnabled()
&& expectedSchema.columns().stream().allMatch(this::supportsParquetBatchReads)
&& taskGroups.stream().allMatch(this::supportsParquetBatchReads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -41,8 +42,10 @@
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.StructLikeSet;
Expand Down Expand Up @@ -171,10 +174,17 @@ protected Set<PartitionSpec> specs() {

protected synchronized List<T> tasks() {
if (tasks == null) {
boolean hasDeletes = false;
try (CloseableIterable<? extends ScanTask> taskIterable = scan.planFiles()) {
List<T> plannedTasks = Lists.newArrayList();

for (ScanTask task : taskIterable) {
if (task instanceof FileScanTask) {
if (!((FileScanTask) task).deletes().isEmpty()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have TableScanUtil.hasDeletes

hasDeletes = true;
}
}

ValidationException.check(
taskJavaClass().isInstance(task),
"Unsupported task type, expected a subtype of %s: %",
Expand All @@ -185,6 +195,11 @@ protected synchronized List<T> tasks() {
}

this.tasks = plannedTasks;
if (hasDeletes) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a UT?

this.sparkSession()
.conf()
.set(SparkSQLProperties.PARQUET_READER_TYPE, ParquetReaderType.ICEBERG.name());
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close scan: " + scan, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.comet.parquet.SupportsComet;
import org.apache.iceberg.BlobMetadata;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
Expand All @@ -37,6 +38,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
Expand Down Expand Up @@ -95,7 +97,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class SparkScan implements Scan, SupportsReportStatistics {
abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet {
private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class);
private static final String NDV_KEY = "ndv";

Expand Down Expand Up @@ -157,6 +159,10 @@ protected Types.StructType groupingKeyType() {
return Types.StructType.of();
}

protected SparkSession sparkSession() {
return spark;
}

protected abstract List<? extends ScanTaskGroup<?>> taskGroups();

@Override
Expand Down Expand Up @@ -251,6 +257,16 @@ protected Statistics estimateStatistics(Snapshot snapshot) {
return new Stats(sizeInBytes, rowsCount, colStatsMap);
}

@Override
public boolean isCometEnabled() {
if (readConf.parquetReaderType() == ParquetReaderType.COMET) {
SparkBatch batch = (SparkBatch) this.toBatch();
return batch.useParquetBatchReads();
}

return false;
}

private long totalRecords(Snapshot snapshot) {
Map<String, String> summary = snapshot.summary();
return PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_RECORDS_PROP, Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@

import java.util.List;
import java.util.Objects;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.SparkSession;

Expand All @@ -51,15 +54,30 @@ class SparkStagedScan extends SparkScan {
@Override
protected List<ScanTaskGroup<ScanTask>> taskGroups() {
if (taskGroups == null) {
boolean hasDeletes = false;
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
List<ScanTask> tasks = taskSetManager.fetchTasks(table(), taskSetId);

for (ScanTask task : tasks) {
if (task instanceof FileScanTask) {
if (!((FileScanTask) task).deletes().isEmpty()) {
hasDeletes = true;
}
}
}

ValidationException.check(
tasks != null,
"Task set manager has no tasks for table %s with task set ID %s",
table(),
taskSetId);

this.taskGroups = TableScanUtil.planTaskGroups(tasks, splitSize, splitLookback, openFileCost);
if (hasDeletes) {
this.sparkSession()
.conf()
.set(SparkSQLProperties.PARQUET_READER_TYPE, ParquetReaderType.ICEBERG.name());
}
}
return taskGroups;
}
Expand Down