Skip to content

Exclude reading _pos column if it's not in the scan list#11390

Merged
szehon-ho merged 8 commits intoapache:mainfrom
huaxingao:pos
Nov 8, 2024
Merged

Exclude reading _pos column if it's not in the scan list#11390
szehon-ho merged 8 commits intoapache:mainfrom
huaxingao:pos

Conversation

@huaxingao
Copy link
Contributor

In Spark batch reading, Iceberg reads additional columns when there are delete files. For instance, if we have a table
test (int id, string data) and a query SELECT id FROM test, the requested schema only contains the column id. However, to determine which rows are deleted (there is a rowIdMapping for this purpose), Iceberg appends _pos to the requested schema for position deletes, and append the equality filter column for equality deletes (suppose the equality delete is on column data). As a result, Iceberg will have ColumnarBatchReaders for these extra columns. In the case of position deletes, we actually don't need to read _pos to compute the rowIdMapping, so this PR excludes the _pos columns when building the ColumnarBatchReader. For equality deletes, while we need to read the equality filter column to compute the rowIdMapping, once we have the rowIdMapping, we should exclude the values of these extra columns from the ColumnarBatch. I will have a separate PR to fix equality delete.

In summary:

SELECT id FROM test

For position delete, the vectorized reader currently returns a ColumnarBatch that contains arrow vectors for both id and _pos. This PR will make iceberg not read _pos column, so the returned ColumnarBatch only contains an arrow vector for id only.

For equality delete (suppose the filter is on data column, the vectorized reader currently returns a ColumnarBatch that contains arrow vectors for both id and data. The goal is to return a ColumnarBatch that contains an arrow vector for id only. I will have a separate PR for this.

@github-actions github-actions bot added the spark label Oct 24, 2024
@huaxingao
Copy link
Contributor Author

cc @szehon-ho @pvary @viirya

@huaxingao
Copy link
Contributor Author

also cc @flyrain

@pvary
Copy link
Contributor

pvary commented Oct 25, 2024

@huaxingao: I'm not an expert in the Spark codebase, but I think having a test which fails before the change and succeeds after the change would be nice. Otherwise we risk future PRs changing this behaviour without the reviewers noticing it.

@szehon-ho
Copy link
Member

@huaxingao its a good find, im just wondering, where do we add _pos to the schema? Can we just not do it there? Just curious if its possible

@dramaticlly
Copy link
Contributor

@huaxingao its a good find, im just wondering, where do we add _pos to the schema? Can we just not do it there? Just curious if its possible

I think it might be from here

if (!posDeletes.isEmpty()) {
requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
}

@huaxingao
Copy link
Contributor Author

@szehon-ho I think we still need the _pos in the requiredSchema to build posAccessor.

@huaxingao
Copy link
Contributor Author

@pvary Thank you for your suggestion! You're correct that adding such a test would help prevent future changes from inadvertently affecting this behavior without notice. Currently, Spark doesn't check the schema when processing batch data, which is why an extra Arrow vector in ColumnarBatch doesn't cause error. However, Comet allocates arrays in a pre-allocated list and relies on the requested schema to determine how many columns are in the batch. If extra columns are returned to Comet, it will fail. While we currently don't have a test that fails due to extra columns, the integration of Comet will change this. Once Comet is integrated, the tests involving the Comet reader will fail if extra columns are present. I believe these Comet reader tests will serve as the tests you've suggested we add.

// vectorization reader. Before removing _pos, we need to make sure _pos is not explicitly
// selected in the query.
if (deleteFilter != null) {
if (deleteFilter.hasPosDeletes() && expectedSchema().findType("_pos") == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a const for _pos?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do. Changed to MetadataColumns.ROW_POSITION.name()

fileSchema ->
VectorizedSparkParquetReaders.buildReader(
requiredSchema, fileSchema, idToConstant, deleteFilter))
vectorizationSchema, fileSchema, idToConstant, deleteFilter))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just expectedSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly.
If no deletes, it's expectedSchema.
If it's equality delete, it's deleteFilter.requiredSchema(), because it could be expectedSchema + equality filter column. For example:

SELECT id FROM table

supposed the equality delete has data == 'aaa'
then we do need to read the data column too, so it's deleteFilter.requiredSchema(), which is id + data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of pos delete, I can't use expectedSchema either, because it could be both pos delete and equality delete.

@huaxingao huaxingao changed the title Exclude reading pos_ column if it's not in the scan list Exclude reading _pos column if it's not in the scan list Oct 26, 2024
@szehon-ho
Copy link
Member

szehon-ho commented Nov 1, 2024

Sorry I still wanted to see if it can be done earlier, what do you think https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java#L99. this is for vectorized path right? Can we pass in a flag to SparkDeleteFilter to not add the column?

It seems a bit wasteful to remove the column after adding it, so just wanted to explore it.

Also the current fix is specific to Parquet, how about ORC?

@huaxingao
Copy link
Contributor Author

@szehon-ho Thanks for the comment.

We actually also use the requiredSchema, that's the schema with the _pos column. In ReadConf#generateOffsetToStartPos, we actually need to know if pos delete exists.

We can pass in a flag to SparkDeleteFilter to not add the _pos column, but then I think we need to add another flag to pass the hasPosDelete info to Parquet ReaderBuilder, and then pass to ReadConfig.

ORC uses expectedSchema(), the schema without _pos column, to build vectorized readers.

@szehon-ho
Copy link
Member

szehon-ho commented Nov 1, 2024

Hm then can we we just add _pos to requiredSchema (with a comment) at https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java#L83?

To me adding it earlier and then removing it later is worse for code understanding, than just adding it when needed.

Probably cleaner with a flag to ReadConf to trigger the generateOffSetToStartPos but not sure if its feasible. fyi @aokolnychyi

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @huaxingao for working on it. Sorry for the delay. Left some comments.

Comment on lines 96 to 99
SparkDeleteFilter sparkDeleteFilter =
new SparkDeleteFilter(filePath, task.deletes(), counter(), true);

SparkDeleteFilter deleteFilter = task.deletes().isEmpty() ? null : sparkDeleteFilter;
Copy link
Contributor

@flyrain flyrain Nov 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need delete filter if task.deletes().isEmpty(), but the new code always creates a filter object. How about this?

filter = null;
if(!task.deletes().isEmpty()) {
 filter = new filter()
}

Schema requestedSchema,
DeleteCounter counter) {
DeleteCounter counter,
boolean isBatchReading) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does batch reading necessarily mean no _pos? I believe not, as you can explicitly project it, like select _pos from t. We should give an accurate name, something like needRowPosition or needRowPosCol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think a bit more, I'm considering if only the non-vector readers actually need an implicit _pos column. If that’s the case, would it make more sense to adjust this within RowReader by adding _pos there? This approach could simplify things by eliminating the need to check whether a reader is vectorized, especially since vectorization isn’t necessarily strongly correlated with the requirement for _pos.
Here is pseudo code to add it in class RowDataReader

    LOG.debug("Opening data file {}", filePath);
    expectedSchema().add(`_pos`);    <--- add it here
    SparkDeleteFilter deleteFilter =
        new SparkDeleteFilter(filePath, task.deletes(), counter(), false);

this.eqDeletes = eqDeleteBuilder.build();
this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
this.requiredSchema =
fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes, isBatchReading);
Copy link
Contributor

@flyrain flyrain Nov 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question I asked myself is whether it impacts the metadata column read? It seems not, but the method DeleteFilter::fileProjection seems a bit hard to read, we can refactor it later. It makes more sense to make it a static until method instead of instance method. Plus, it's a bit weird to pass the schema to delete filter, then get it back from the filter. This seems something we can improve on it as a follow up.

private Map<Long, Long> generateOffsetToStartPos(Schema schema) {
if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) {
private Map<Long, Long> generateOffsetToStartPos() {
if (hasPositionDelete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend to do it in the caller like this

    Map<Long, Long> offsetToStartPos = hasPositionDelete? generateOffsetToStartPos(): null;

Copy link
Contributor

@flyrain flyrain Nov 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we don't actually need this if this PR #10107 is in, which removes related complexity. The parquet side is ready for a while. We may get #10107 in.

.filter(residual)
.caseSensitive(caseSensitive())
.withNameMapping(nameMapping())
.hasPositionDelete(readSchema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need it for row reader? Can we use a default value here?

// read performance as every batch read doesn't have to pay the cost of allocating memory.
.reuseContainers()
.withNameMapping(nameMapping())
.hasPositionDelete(hasPositionDelete)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if withPositionDelete is more suitable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key here is determining whether we want to compute row offsets specifically for the filtered row groups. This decision doesn’t have to be directly tied to the presence of position deletes. Perhaps a name like needRowGroupOffset would better capture this intent and improve clarity.

@huaxingao
Copy link
Contributor Author

Thanks a lot @flyrain and @szehon-ho for your review! I've thought this over: I feel the original change is much simpler. If the original one looks OK to you, I will revert to it. Thanks a lot!


Set<Integer> requiredIds = Sets.newLinkedHashSet();
if (!posDeletes.isEmpty()) {
if (!posDeletes.isEmpty() && needRowPosCol) {
Copy link

@sfc-gh-ygu sfc-gh-ygu Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: switch two condition to make it more readable. Not a blocker though.

if (needRowPosCol && !posDeletes.isEmpty()) {}

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Pending pipeline.

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me

@szehon-ho szehon-ho merged commit 1c576c5 into apache:main Nov 8, 2024
@szehon-ho
Copy link
Member

Merged, thanks @huaxingao , and also @flyrain for review, and @pvary @dramaticlly @viirya for other reviews

@huaxingao
Copy link
Contributor Author

Thanks a lot! @flyrain @szehon-ho @viirya @dramaticlly @pvary

@huaxingao huaxingao deleted the pos branch November 9, 2024 00:00
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants