-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Iceberg streaming streaming-skip-overwrite-snapshots SparkMicroBatchStream only skips over one file per trigger #8980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| for (ManifestFile manifest : manifestFiles) { | ||
| manifestIndexes.add(Pair.of(manifest, currentFileIndex)); | ||
| currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount(); | ||
| currentFileIndex += manifest.addedFilesCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we please add a ut for this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would but I don't know how?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya we need to think of a case where this would actually result in incorrect results i am a bit surprised that there is no existing case for testing this code path. when i was adding rate limit code i just refactored this part to a common function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya none of the snapshots have an existing file count value.
when I look at production tables it's pretty rare actually to see that. I don't know what makes the existing file count be set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cccs-jc imho then we should not remove this line until we have coverage for this then, can you please revert this change then ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is relatively easy to reason about this part because it is used only in one path.
The existing entries will be entries of data files that are already part of a manifest. For example, when you rewrite the metadata; for example because you use a lot of fast-appends. With a fast append, a new manifest will be written and added to the manifest-list. At some point, you want to combine these manifests into a bigger one to reduce the number of calls to the object store and make the planning part faster.
We want to skip when only the metadata has been rewritten, so I think this change is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, see here I show how the existingFilesCount is only set when doing rewrites.
However the micro-batch streaming reader skips over rewrites. So it will never happen that the existingFilesCount is set in the above code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can happen, for example, it is easy to reproduce in TestTransaction::testTransactionRecommit where a new datafile is committed, and merged into an existing one:
Now, I'm questioning skipManifests, if it is valid to rely on the counts of manifests. But I have to dig deeper into this code since I'm not too familiar with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@singhpk234 I'll put the + manifest.existingFilesCount(); back.
Now that I know how to create manifests with existing file counts I will add this to the test suite
commit.manifest.min-count-to-merge=3
This way the test will include some manifest entries with existing file counts. I ran the tests again with the + manifest.existingFilesCount(); and without. All the test still pass. But I think that makes sense because the skipManifests is an optimization.
Have a look and let me know what you think.
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
...k/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Outdated
Show resolved
Hide resolved
...k/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Outdated
Show resolved
Hide resolved
...k/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
Outdated
Show resolved
Hide resolved
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cccs-jc i would recommend to make the changes 1 spark version at a time and then create back-port pr, i am not sure what is the preferred though but checking it 1 version at a time helps in review and focus.
| for (ManifestFile manifest : manifestFiles) { | ||
| manifestIndexes.add(Pair.of(manifest, currentFileIndex)); | ||
| currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount(); | ||
| currentFileIndex += manifest.addedFilesCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cccs-jc imho then we should not remove this line until we have coverage for this then, can you please revert this change then ?
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
| shouldContinueReading = false; | ||
| break; | ||
| } | ||
| // we found the next available snapshot, continue from there. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this comment required ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it makes it explicit we skipped some snapshots and we are continuing from nextValid
|
|
||
| Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId()); | ||
| // skip over rewrite and delete snapshots | ||
| while (!shouldProcess(nextSnapshot)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does shouldProcess handle null ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think SnapshotUtil.snapshotAfter never returns null
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
I removed the 3.4 version. The only version in the commit is 3.5 |
|
@singhpk234 As you recommended I removed the 3.4 implementation and only kept one version 3.5. However, now the test cases for 3.4 are failing. Any idea how to fix this. Should I just put back the 3.4 implementation? |
What do you think @singhpk234 ? |
|
@cccs-jc i mean let's have changes for 3.5 with it's test only in 3.5 and we can backport the change with it's test in lower spark version like 3.4 and 3.3, 3.4 test failures are expected right as we don't have changes for SparkMicrobatch stream for 3.4 in it. Also i would request to revert the change in core for Microbatch.java if we don't have coverage for it as i am unsure when would that fail (may be some legacy handling) Apologies for getting being late in getting back at this. |
Keeping the What is the purpose of adding that to the currentFileIndex ? The way I understand it currentFileIndex is a position of the added files. So we want to only count the added files (addedFilesCount()). These are the files that you want a streaming job to consume. Can you explain what is the purpose of using |
I am not fully aware of this logically i totally agree with you it makes no sense to keep it but what i am skeptical is if there is some handling happening due to backward compatibility (not sure about it either) and you also suggested here that you are not able to populate this field (refering to your comment here : #8980 (comment)) so my rationale here was to add this change when we can add some coverage or better would to take this change out of the current pr and involve more folks for the discussion and let this pr go as a functionality to skip overwrite commit only, please let me know your thoughts ? |
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me, thanks @cccs-jc for working on this.
@singhpk234 Do you have any further concerns? I'm also very skeptical of counting the added files, and I think we might want to remove that piece of logic (in a separate PR).
+1 on this @Fokko other than this no further concerns from my end ! |


Closes #8902
@singhpk234 I have fixed the issue #8902. Could you have a look at it.