From 0d177af139e6c88e3af0d7e15ed53ce67c1c9ac4 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Wed, 20 Nov 2024 18:02:49 -0800 Subject: [PATCH 1/2] Spar: remove ROW_POSITION from project schema --- .../iceberg/spark/source/BaseBatchReader.java | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 49c43952135c..c05b694a60dc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -32,12 +31,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader extends BaseReader { @@ -84,21 +81,9 @@ private CloseableIterable newParquetIterable( SparkDeleteFilter deleteFilter) { // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false; - Schema projectedSchema = requiredSchema; - if (hasPositionDelete) { - // We need to add MetadataColumns.ROW_POSITION in the schema for - // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed any - // more after #10107 is merged. - List columns = Lists.newArrayList(requiredSchema.columns()); - if (!columns.contains(MetadataColumns.ROW_POSITION)) { - columns.add(MetadataColumns.ROW_POSITION); - projectedSchema = new Schema(columns); - } - } return Parquet.read(inputFile) - .project(projectedSchema) + .project(requiredSchema) .split(start, length) .createBatchedReaderFunc( fileSchema -> From 66ef5b6215fa89ec526425489fbdfb32127ddabe Mon Sep 17 00:00:00 2001 From: huaxingao Date: Thu, 21 Nov 2024 09:48:55 -0800 Subject: [PATCH 2/2] also remove ROW_POSITION from project schema in spark3.3 and 3.4 --- .../iceberg/spark/source/BaseBatchReader.java | 17 +---------------- .../iceberg/spark/source/BaseBatchReader.java | 17 +---------------- 2 files changed, 2 insertions(+), 32 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 49c43952135c..c05b694a60dc 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -32,12 +31,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader extends BaseReader { @@ -84,21 +81,9 @@ private CloseableIterable newParquetIterable( SparkDeleteFilter deleteFilter) { // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false; - Schema projectedSchema = requiredSchema; - if (hasPositionDelete) { - // We need to add MetadataColumns.ROW_POSITION in the schema for - // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed any - // more after #10107 is merged. - List columns = Lists.newArrayList(requiredSchema.columns()); - if (!columns.contains(MetadataColumns.ROW_POSITION)) { - columns.add(MetadataColumns.ROW_POSITION); - projectedSchema = new Schema(columns); - } - } return Parquet.read(inputFile) - .project(projectedSchema) + .project(requiredSchema) .split(start, length) .createBatchedReaderFunc( fileSchema -> diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 49c43952135c..c05b694a60dc 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -32,12 +31,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader extends BaseReader { @@ -84,21 +81,9 @@ private CloseableIterable newParquetIterable( SparkDeleteFilter deleteFilter) { // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false; - Schema projectedSchema = requiredSchema; - if (hasPositionDelete) { - // We need to add MetadataColumns.ROW_POSITION in the schema for - // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed any - // more after #10107 is merged. - List columns = Lists.newArrayList(requiredSchema.columns()); - if (!columns.contains(MetadataColumns.ROW_POSITION)) { - columns.add(MetadataColumns.ROW_POSITION); - projectedSchema = new Schema(columns); - } - } return Parquet.read(inputFile) - .project(projectedSchema) + .project(requiredSchema) .split(start, length) .createBatchedReaderFunc( fileSchema ->