From 8cdda169fafcf25bcbda4c613ed068e2208b01ee Mon Sep 17 00:00:00 2001 From: huaxingao Date: Wed, 16 Jul 2025 10:26:52 -0700 Subject: [PATCH 1/5] fix: clean up iceberg integrtion APIs --- .../org/apache/comet/parquet/BatchReader.java | 4 +-- .../java/org/apache/comet/parquet/Utils.java | 33 ++++++++----------- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 663406d0a9..73888d3999 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -183,9 +183,7 @@ public BatchReader( this.taskContext = TaskContext$.MODULE$.get(); } - public BatchReader(AbstractColumnReader[] columnReaders) { - // Todo: set useDecimal128 and useLazyMaterialization - int numColumns = columnReaders.length; + public BatchReader(int numColumns) { this.columnReaders = new AbstractColumnReader[numColumns]; vectors = new CometVector[numColumns]; currentBatch = new ColumnarBatch(vectors); diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index fc7d1ab871..4eb2ac405c 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -32,28 +32,23 @@ public class Utils { /** This method is called from Apache Iceberg. */ public static ColumnReader getColumnReader( - DataType type, - ParquetColumnSpec columnSpec, - CometSchemaImporter importer, - int batchSize, - boolean useDecimal128, - boolean useLazyMaterialization) { + DataType type, + ParquetColumnSpec columnSpec, + CometSchemaImporter importer, + int batchSize, + boolean useDecimal128, + boolean useLazyMaterialization, + boolean useLegacyTimestamp) { ColumnDescriptor descriptor = buildColumnDescriptor(columnSpec); return getColumnReader( - type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); - } - - public static ColumnReader getColumnReader( - DataType type, - ColumnDescriptor descriptor, - CometSchemaImporter importer, - int batchSize, - boolean useDecimal128, - boolean useLazyMaterialization) { - // TODO: support `useLegacyDateTimestamp` for Iceberg - return getColumnReader( - type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); + type, + descriptor, + importer, + batchSize, + useDecimal128, + useLazyMaterialization, + useLegacyTimestamp); } public static ColumnReader getColumnReader( From 97cb04b79f58b20941f79cf50840e03e1d70b73d Mon Sep 17 00:00:00 2001 From: huaxingao Date: Wed, 16 Jul 2025 13:37:14 -0700 Subject: [PATCH 2/5] formatting --- .../java/org/apache/comet/parquet/Utils.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 4eb2ac405c..266bc3e80f 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -32,23 +32,23 @@ public class Utils { /** This method is called from Apache Iceberg. */ public static ColumnReader getColumnReader( - DataType type, - ParquetColumnSpec columnSpec, - CometSchemaImporter importer, - int batchSize, - boolean useDecimal128, - boolean useLazyMaterialization, - boolean useLegacyTimestamp) { + DataType type, + ParquetColumnSpec columnSpec, + CometSchemaImporter importer, + int batchSize, + boolean useDecimal128, + boolean useLazyMaterialization, + boolean useLegacyTimestamp) { ColumnDescriptor descriptor = buildColumnDescriptor(columnSpec); return getColumnReader( - type, - descriptor, - importer, - batchSize, - useDecimal128, - useLazyMaterialization, - useLegacyTimestamp); + type, + descriptor, + importer, + batchSize, + useDecimal128, + useLazyMaterialization, + useLegacyTimestamp); } public static ColumnReader getColumnReader( From 37642e24efa2ae68324c49b4bec4cc38a1435541 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 21 Jul 2025 16:21:35 -0700 Subject: [PATCH 3/5] add setColumnReaders --- .../org/apache/comet/parquet/BatchReader.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 73888d3999..d4d0ded5d9 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -183,13 +183,12 @@ public BatchReader( this.taskContext = TaskContext$.MODULE$.get(); } + // This constructor is used by Iceberg only. The Iceberg side will also call + // initByIceberg to initialize the columnReaders public BatchReader(int numColumns) { this.columnReaders = new AbstractColumnReader[numColumns]; vectors = new CometVector[numColumns]; currentBatch = new ColumnarBatch(vectors); - // This constructor is used by Iceberg only. The columnReaders are - // initialized in Iceberg, so no need to call the init() - isInitialized = true; this.taskContext = TaskContext$.MODULE$.get(); this.metrics = new HashMap<>(); } @@ -375,12 +374,15 @@ public void init() throws URISyntaxException, IOException { } } - public void setSparkSchema(StructType schema) { - this.sparkSchema = schema; + // Used by Iceberg only. + public void initByIceberg(AbstractColumnReader[] columnReaders) { + this.columnReaders = columnReaders; + this.isInitialized = true; } - public AbstractColumnReader[] getColumnReaders() { - return columnReaders; + // Used by Iceberg only. + public void setSparkSchema(StructType schema) { + this.sparkSchema = schema; } @Override From 5e6552c6e874a9aa8789388e8441b7670bd0d004 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 21 Jul 2025 23:44:05 -0700 Subject: [PATCH 4/5] address comments --- .../org/apache/comet/parquet/BatchReader.java | 45 ++++++++++++------- .../parquet/IcebergCometBatchReader.java | 44 ++++++++++++++++++ .../java/org/apache/comet/parquet/Utils.java | 18 ++++++++ 3 files changed, 90 insertions(+), 17 deletions(-) create mode 100644 common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index d4d0ded5d9..b75c9cc5c8 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -99,20 +99,20 @@ public class BatchReader extends RecordReader implements Cl private StructType partitionSchema; private InternalRow partitionValues; private PartitionedFile file; - private final Map metrics; + protected Map metrics; private long rowsRead; - private StructType sparkSchema; + protected StructType sparkSchema; private MessageType requestedSchema; - private CometVector[] vectors; - private AbstractColumnReader[] columnReaders; + protected CometVector[] vectors; + protected AbstractColumnReader[] columnReaders; private CometSchemaImporter importer; - private ColumnarBatch currentBatch; + protected ColumnarBatch currentBatch; private Future> prefetchTask; private LinkedBlockingQueue> prefetchQueue; private FileReader fileReader; private boolean[] missingColumns; - private boolean isInitialized; + protected boolean isInitialized; private ParquetMetadata footer; /** The total number of rows across all row groups of the input split. */ @@ -143,7 +143,9 @@ public class BatchReader extends RecordReader implements Cl private boolean useLegacyDateTimestamp; /** The TaskContext object for executing this task. */ - private final TaskContext taskContext; + protected TaskContext taskContext; + + public BatchReader() {} // Only for testing public BatchReader(String file, int capacity) { @@ -183,12 +185,18 @@ public BatchReader( this.taskContext = TaskContext$.MODULE$.get(); } - // This constructor is used by Iceberg only. The Iceberg side will also call - // initByIceberg to initialize the columnReaders - public BatchReader(int numColumns) { + /** + * @deprecated since 0.9.1, will be removed in 0.10.0. + */ + public BatchReader(AbstractColumnReader[] columnReaders) { + // Todo: set useDecimal128 and useLazyMaterialization + int numColumns = columnReaders.length; this.columnReaders = new AbstractColumnReader[numColumns]; vectors = new CometVector[numColumns]; currentBatch = new ColumnarBatch(vectors); + // This constructor is used by Iceberg only. The columnReaders are + // initialized in Iceberg, so no need to call the init() + isInitialized = true; this.taskContext = TaskContext$.MODULE$.get(); this.metrics = new HashMap<>(); } @@ -374,17 +382,20 @@ public void init() throws URISyntaxException, IOException { } } - // Used by Iceberg only. - public void initByIceberg(AbstractColumnReader[] columnReaders) { - this.columnReaders = columnReaders; - this.isInitialized = true; - } - - // Used by Iceberg only. + /** + * @deprecated since 0.9.1, will be removed in 0.10.0. + */ public void setSparkSchema(StructType schema) { this.sparkSchema = schema; } + /** + * @deprecated since 0.9.1, will be removed in 0.10.0. + */ + public AbstractColumnReader[] getColumnReaders() { + return columnReaders; + } + @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { diff --git a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java new file mode 100644 index 0000000000..25b8ed2815 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import java.util.HashMap; + +import org.apache.spark.TaskContext$; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +import org.apache.comet.vector.CometVector; + +public class IcebergCometBatchReader extends BatchReader { + public IcebergCometBatchReader(int numColumns, StructType schema) { + this.columnReaders = new AbstractColumnReader[numColumns]; + this.vectors = new CometVector[numColumns]; + this.currentBatch = new ColumnarBatch(vectors); + this.taskContext = TaskContext$.MODULE$.get(); + this.metrics = new HashMap<>(); + this.sparkSchema = schema; + } + + public void init(AbstractColumnReader[] columnReaders) { + this.columnReaders = columnReaders; + this.isInitialized = true; + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 266bc3e80f..3e2e093a85 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -51,6 +51,24 @@ public static ColumnReader getColumnReader( useLegacyTimestamp); } + /** + * This method is called from Apache Iceberg. + * + * @deprecated since 0.9.1, will be removed in 0.10.0; use getColumnReader with ParquetColumnSpec + * instead. + */ + public static ColumnReader getColumnReader( + DataType type, + ColumnDescriptor descriptor, + CometSchemaImporter importer, + int batchSize, + boolean useDecimal128, + boolean useLazyMaterialization) { + // TODO: support `useLegacyDateTimestamp` for Iceberg + return getColumnReader( + type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); + } + public static ColumnReader getColumnReader( DataType type, ColumnDescriptor descriptor, From af0725d1f5c867266b0aad6e2b3435899facd10c Mon Sep 17 00:00:00 2001 From: huaxingao Date: Wed, 23 Jul 2025 14:53:07 -0700 Subject: [PATCH 5/5] address comments --- common/src/main/java/org/apache/comet/parquet/BatchReader.java | 2 +- .../java/org/apache/comet/parquet/IcebergCometBatchReader.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index b75c9cc5c8..538de4a66d 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -143,7 +143,7 @@ public class BatchReader extends RecordReader implements Cl private boolean useLegacyDateTimestamp; /** The TaskContext object for executing this task. */ - protected TaskContext taskContext; + private TaskContext taskContext; public BatchReader() {} diff --git a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java index 25b8ed2815..8953c9d935 100644 --- a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java @@ -21,7 +21,6 @@ import java.util.HashMap; -import org.apache.spark.TaskContext$; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -32,7 +31,6 @@ public IcebergCometBatchReader(int numColumns, StructType schema) { this.columnReaders = new AbstractColumnReader[numColumns]; this.vectors = new CometVector[numColumns]; this.currentBatch = new ColumnarBatch(vectors); - this.taskContext = TaskContext$.MODULE$.get(); this.metrics = new HashMap<>(); this.sparkSchema = schema; }