-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.2: Support reading position deletes #4812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
95f2822 to
b17d8f8
Compare
| @@ -0,0 +1,50 @@ | |||
| /* | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: moved from BaseFileScanTask, no changes.
| @@ -0,0 +1,66 @@ | |||
| /* | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: moved from BaseFileScanTask, no changes
| @@ -0,0 +1,122 @@ | |||
| /* | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: moved from BaseFileScanTask, changes : FileScanTask.file() => FileScanTask.contentFile()
.palantir/revapi.yml
Outdated
| justification: "New method added to FileScanTask. Should be not many implementations\ | ||
| \ outside Iceberg, if so it's easy to fill in new method (especially if they\ | ||
| \ inherit BaseFileScanTask). This is easiest way to add DeleteFileScanTask,\ | ||
| \ otherwise even bigger interface change is necessary" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: I'm looking for a way to suppress addedToInterface in addition to a few other non-breaking changes.
The way we release Iceberg, it's not really something that we need to worry about much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I didn't know the new policy, thanks, good to know. (was thinking to try adding a default too, though it might be unnecessary).
| this.offsets = ImmutableList.copyOf(offsetList); | ||
| this.parentScanTask = parentScanTask; | ||
| this.splitSizes = Lists.newArrayListWithCapacity(offsets.size()); | ||
| if (offsets.size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a situation that can be encountered? And if it can, do we need to ensure that parentScanTask is in fact empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sometimes it happens? (This part is the original code, I just moved it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me.
f4e55b3 to
64db4c9
Compare
|
FYI @aokolnychyi @RussellSpitzer @rdblue, if you also have time to take a look |
144d6ff to
e257d92
Compare
|
Let me take a look. |
| /** | ||
| * @return the file to scan | ||
| */ | ||
| ContentFile<?> contentFile(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR raises a somehow similar question about the future of FileScanTask as in #4870. Maybe, we can discuss there a bit.
Instead of adding a new method like contentFile, what about wrapping a delete file as DataFile and using the task without any changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think that works although it felt not so clean, DeleteDataFile? I though originally it would have been better to have ContentFileScanTask if we started fresh, as all the DataFile fields are from there anyway. Agree that this way will definitely be less changes in code, so I'm ok if everyone agrees
Will try to follow on #4870 as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi reverted the refactor to move this forward. As over there it seems there is no need to represent EqualityDelete as DataFile, it seems this becomes the only weird case (outside of course the metadata tables). If we want to do it later, its still open.
| } | ||
|
|
||
| @Override | ||
| public Schema schema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to include a column that would indicate either the snapshot ID when it was added or the sequence number of the row. We can then have filter pushdown on those columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi i added a test to verify all the metadata columns like _file_path work as expected. So I would say, users can join position_delete._file_path column with delete_files table to get snapshot-id, sequence-number as they wish.
Adding these columns directly to PositionDeletes metadata table and having filtering pushdown will be trickier. This is because FileScanTask interface as is does not contain sequence number, snapshot-id, and we would need to probably subclass it to contain this information. And we would need to add metadata columns just for this metadata table, if so, to expose it. (currently the only way to add columns to tables that use Spark RowReader is by metadata columns)
It to me a bit of a complicated optimization and maybe not worth the extra code complexity, Not sure if this is a very common query of filtering out certain sequence numbers, as in general delete file entries will always be against data files of lower sequence numbers and thus always apply. In any case , the user can accomplish it via join a little more expensively.
On the other hand, I could perhaps look at adding partition filtering push down. There's already metadata column _partition and I think it will be a more common query to optimize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: metadata column works but unfortunately does not get pushed down for filtering by Iceberg in general: SparkScanBuilder.pushFilters(). Seems like it will be useful but orthogonal change.
Investigating making partition column as non-metadata column, though its a shame as there already exists the _partition metadata column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update 2: Successfully made a non-metadata column partition column, and partition filter based on it.
|
I feel it is reasonable to have a metadata table like this. I am not sure about the change in |
|
+1 for using a metadata table for this. It can then be used in numerous places and for various reasons. |
|
While I think a metadata table like this would be helpful, I am not sure it is something we can leverage during compaction. We will need to have access to a list of |
|
Regarding the metadata table, why is it only limited to position deletes? do we anticipate a separate equality deletes metadata table too? This metadata table is at row level, like a source. For row-level deletes metadata, I am wondering if I assume this position_deletes metadata table is to not related to the CDC read like Ryan suggested in the CDC. Otherwise, the metadata table should cover inserts, position deletes, and equality deletes. |
|
@stevenzwu yea I think equality deletes will be different tables because the schema is not the same. Yea as delete_files is a summary, I thought this would be useful as we are lacking any way for user to see contents of delete files easily. I guess it will not be the exact same as CDC table. @aokolnychyi yea the idea wrt RewritePositionDeleteFiles was to use the building blocks in conjunction with file-scan-task-set-id, for the compaction case. This pr should have most of the pieces though, and hopefully we just need to add some wiring. |
Thanks for confirming. |
52791e5 to
a09814d
Compare
f18854b to
70df682
Compare
|
Addressed the review comments. As mentioned in second comment , adding some filter pushdown may require a bit of discussion, so i may give a try here (at least for partition column) or in a second pr. But in any case, this should be ready as is for a first review. |
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public long targetSplitSize() { | ||
| long tableValue = tableOps().current().propertyAsLong( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may need something here to deal with splittable vs un-splittable files? Currently all of our delete files are parquet (I believe) which means we have to take into effect our offsets when planning and combining splits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be fine, currently BaseFileScanTask checks if the file type is splittalbe before using this value. I also added test in latest pr against all three file formats, against one test method that purposely hits the split limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I'd be surprised if a metadata table does not respect the metadata split size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default metadata split size is 32MB vs data file split size is 128 MB. I thought initially the metadata split size is more suitable to read metadata than content files (which could be big?) but I can change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
125f706 to
1443b71
Compare
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know when this PR is rebased on top of the ScanRefactor with the custom scan task we discussed
ccb828e to
bbc8547
Compare
| return transformSpec(metadataTableSchema, spec.partitionType()); | ||
| } | ||
|
|
||
| static PartitionSpec transformSpec(Schema metadataTableSchema, Types.StructType partitionType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a tricky bug here in partition filter push down that took me awhile to find.
In most metadata tables, the transformed partition spec used only in ManifestGroup to filter out matching manifests. In that case, it seems the generated fieldId is not important. However, the position delete metadata table needs to go one level further and pushes filter to ManifestReader to filter matching manifest-entries. A wrong fieldId causes the DeleteFile to be instantiated with wrong partition information (as the field id is used to project onto the partition struct to lookup the value), leading to various bugs in filtering out the wrong files.
|
Added partition column and got filter push down working, with all the partition spec evolution scenarios. Added a lot of tests in dedicated file (TestPositionDeletesTable). Will look to see how to incorporate #5077 when it is in. |
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java
Outdated
Show resolved
Hide resolved
| CloseableIterable<CloseableIterable<FileScanTask>> results = CloseableIterable.transform(deleteManifests, m -> { | ||
|
|
||
| // Filter partitions | ||
| CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = ManifestFiles |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about projection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I played with selecting some manifest fields, but actually we need all of them (even column stats). Because we need to evaluate the filters on the actual delete file, we need to propagate this down.
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public long targetSplitSize() { | ||
| long tableValue = tableOps().current().propertyAsLong( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I'd be surprised if a metadata table does not respect the metadata split size.
f27e71b to
e3ed7c0
Compare
|
Rebase and support spark 3.3, resolved most comments. |
| public static final String DELETE_FILE_ROW_FIELD_NAME = "row"; | ||
| public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103; | ||
| public static final String DELETE_FILE_ROW_DOC = "Deleted row values"; | ||
| public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we have a static column in the metadata table that we plan to populate via the mechanism for metadata columns. It looks a little bit suspicious to me.
I feel we should pick one of these options:
- Have only
path,pos,rowcolumns in the table and use_partitionand_spec_idmetadata columns. That will mean we have to support filter pushdown on metadata columns. It is easy to handle this on the Spark side but we will also have to adapt ALL of our binding code to allow binding predicates with metadata columns. The last part will be a big change. - Make
partitionandspec_idcolumns static and useDataTask.
I kind of like using FileScanTask for this effort to support vectorized reads so the first option seems preferable.
Thoughts, @szehon-ho @RussellSpitzer @rdblue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't necessarily have to use DataTask if we decide to go with static columns. We can add a new task type that will leverage another way to set these constant values instead of hacking PartitionUtil and metadata columns. We should be able to reuse most code in our readers and support vectorized reads as well.
The new task will have to extend FileScanTask, though, as it will be part of TableScan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 yea I prefer static (declared) columns in my opinion, I can give a try to see if adding a new kind of marker FileScanTask helps clean it up.
Partition/spec_id will be important columns for future things we want to do with this table (ie, compact position deletes, remove dangling position deletes), so having them be a metadata column of metadata table is a bit too meta! Though the population mechanism will be similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern with the marker FileScanTask is that if an engine is implementing metadata tables like normal reads, then we've introduced a correctness problem because it doesn't know to read the new task differently. I think the cleanest way is probably to use DataTask.
I think the concern about using DataTask is valid, since it exposes a row-based interface and isn't intended for large uses that benefit from vectorization. It was originally intended for small tables, like snapshots.
However, I've been thinking for a while that a significant improvement is to adapt Arrow record batches into rows, so we can take advantage of vectorized reads in all cases, not just when the engine supports a vectorized format. That is probably way faster, so we could explore doing that here and using a joined row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a look. It's getting a bit more messy than anticipated.
Was spending a lot of time yesterday looking at the marker FileScanTask approach and it does get messy. Especially trying to add another constant column (spec_id) to this table, not only is it another use of metadata column to populate it, we have to figure out how to make a residual expression without the constant column as otherwise the default file read code tries to filter too aggressively if there is a spec_id filter (as the stats do not exist on the file and the pruning code skips it) . In short , adding any more constant columns to this table, gets a bit messy.
Makes sense to explore using some kind of StaticDataTask. I think the problem for that, currently it would not do split() and implement any file residual filtering on position-delete row values if there are any filters there, and of course vectorization, and it'll be re-implementing these if we want it. But maybe I am over-optimizing this and in the first cut we can live without those and add them later. Can look at this approach, and see if I hit any major blockers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, +1 for getting a working implementation with DataTask and optimizing as we go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi guys, I hit the first issue , as the code for PositionDeletesTable/ DataTask is in core module, there is no way currently to access Parquet, ORC, and the file readers to implement DataTask::rows(). I mean Spark could pass in a positionDeleteReader that returns a CloseableIterable but then it seems a bit silly to use the DataTask (except that DataTask can add the static columns like partition and partition_spec_id)
Another thing that needs to be passed in is encryption (like in DeleteFilter::inputFile(), it seems to be different in Spark/Flink. Let me know if you have any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the only thing I can think to do is to provide a shared implementation of positionDeleteReader in data module which has access to the Parquet/ORC modules, but each engine still needs to check the type of table (or the marker FileScanTask) and then manually plug this in to the DataTask, for the positionDeletes table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @rdblue @aokolnychyi @rdblue appreciate any thoughts, I think its a good idea to try to make this table usable across engines using existing DataTask, without extra if-else handling, but not sure we can do it as of today due to the dependency. Will try to explore the above , but it still will not be as clean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey guys I really tried hard on this, but couldn't find a way. We looked with @RussellSpitzer and looks like Trino actually wont break if we had a new table (they support each one manually, and actually i think a good portion of their metadata tables don't even use our MetadataTable classes or MetadataTableScans classes altogether). Discussed a bit with @aokolnychyi as well and coudn't find a good way. Made a latest pr with a least bad way, piggy backing off of some of the latest RowReader refactoring
f8d634d to
3dc3a6d
Compare
| IS_DELETED.fieldId(), | ||
| SPEC_ID.fieldId(), | ||
| PARTITION_COLUMN_ID); | ||
| PARTITION_COLUMN_ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to avoid an issue when trying to read these new constant columns via Avro reader, which is supported but only if they are defined in this set. Note, we chose explicitly to not re-use the existing metadata column ids for these constants, as it might cause confusion (metadata columns being a concept for data tables).
Caused by: java.lang.IllegalArgumentException: Missing required field: partition
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220)
at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:104)
at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:42)
at org.apache.iceberg.avro.AvroCustomOrderSchemaVisitor.visit(AvroCustomOrderSchemaVisitor.java:50)
at org.apache.iceberg.avro.AvroSchemaUtil.buildAvroProjection(AvroSchemaUtil.java:126)
at org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:68)
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:133)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:139)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:131)
at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:75)
at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:76)
at org.apache.iceberg.io.CloseableIterable.lambda$filter$0(CloseableIterable.java:110)
at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:61)
at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:38)
at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:128)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
3dc3a6d to
10e9bde
Compare
|
Some test was cancelled, re-triggering |
|
Closing as it's now broken into smaller prs |
This exposes position deletes as a metadata table "position_deletes", with schema: file, pos, row, partition
This will be useful when trying to implement "RewritePositionDeleteFiles", as we will read positional deletes from Spark and then write it. It will also be useful to implement "RemoveDanglingDeleteFiles", ie removing delete files that no longer reference live data files. It will also be generally useful to get more insights into positional delete files as a table via SQL.
Notes: