-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: We should probably say why we cannot process the snapshot in SparkMicroBatchStream #3749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @hililiwei!
This is a great addition. I left a few notes. If you can find a place that already defines what a valid tableIdentifier is for Spark (to pass to a dataframe read etc), it would be great to not duplicate the definition.
If we can't find one and leave it here, small nit: you can drop the HDFS from the statement about the path. This will work on any supported file system and I would just simply say table instead of HDFS table.
Thanks again @hililiwei!
I left a few comments. If you can find an existing place that defines what a valid table identifier is, I'd say link to that. Otherwise, it's fine as is.
| !!! Note | ||
| Iceberg only supports read data from snapshot whose type of Data Operations is APPEND\REPLACE\DELETE. In particular if some of your snapshots are of DELETE type, you need to add 'streaming-skip-delete-snapshots' option to skip it, otherwise the task will fail. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: In the rich diff, this note isn't coming up formatted. Have you verified using mkdocs that this formats like the other parts that use !!!?
Also, we might want to just format this as any other config box vs using the !!! Note statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to show like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh ok. Yeah I would agree that makes sense here. 👍
| The `tableIdentifier` can be: | ||
|
|
||
| * The fully-qualified path to a HDFS table, like `hdfs://nn:8020/path/to/table` | ||
| * A table name if the table is tracked by a catalog, like `database.table_name` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question / Comment: It might be better to just say that the table identifier can be any valid table identifier or table path and link to any existing docs we have on that., instead of repeating the definition here or hiding the definition within Spark streaming reads section (if we don't have it defined somewhere else).
Is there a place we can link to already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find a reference anywhere in the document yet, how about I link it to the Java Doc(https://iceberg.apache.org/javadoc/0.12.1/org/apache/iceberg/catalog/TableIdentifier.html)?
Maybe I add its definition to the document in this issuse? or we create a separate issuse trace for this?
| Preconditions.checkState( | ||
| op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE), | ||
| "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId()); | ||
| "Cannot process snapshot: %s, Structured Streaming does not support snapshots of type %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we say .... does not currently support snapshots of type %s? In the future, we will support reading more of them, like we do in Flink.
Also, can we mention the config streaming-skip-delete-snapshots in the Preconditions check? That way, if users get this exception, they know the option to get passed it if they'd like.
Maybe like
Preconditions.checkState(
op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE),
"Cannot process snapshot: %s. Structured Streaming does not support snapshots of type %s. To ignore snapshots of type delete, set the config %s to true.",
snapshot.snapshotId(),
op.toLowerCase(Locale.ROOT),
SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put the hint to skip delete here:
iceberg/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Lines 204 to 206 in 6b4cd71
| Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete, | |
| "Cannot process delete snapshot: %s, to ignore snapshots of type delete, set the config %s to true.", | |
| snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); |
The 'currently' keyword is added.
|
|
||
| ## Streaming Reads | ||
|
|
||
| Iceberg supports processing incremental data in spark structured streaming jobs which starts from a historical timestamp: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: double space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand. Delete the blank line? This blank line seems to be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two spaces between "streaming" and "jobs"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for my carelessness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a problem! This is why we review.
| .load(tableIdentifier) | ||
| ``` | ||
|
|
||
| The `tableIdentifier` can be any valid table identifier or table path. Refer [TableIdentifier](https://iceberg.apache.org/javadoc/0.12.1/org/apache/iceberg/catalog/TableIdentifier.html) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that linking to TableIdentifer is helpful. That's a class used internally, but users are going to supply a string here. It would be better to remove this paragraph and replace tableIdentifier with an example string, like load("db.table").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. In addition, should we explain TableIdentifer etc in https://iceberg.apache.org/#terms/ section or elsewhere? . This may be useful to developers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be useful in some of the API documentation.
| The `tableIdentifier` can be any valid table identifier or table path. Refer [TableIdentifier](https://iceberg.apache.org/javadoc/0.12.1/org/apache/iceberg/catalog/TableIdentifier.html) | ||
|
|
||
| !!! Note | ||
| Iceberg only supports read data from snapshot whose type of Data Operations is APPEND\REPLACE\DELETE. In particular if some of your snapshots are of DELETE type, you need to add 'streaming-skip-delete-snapshots' option to skip it, otherwise the task will fail. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few issues with this paragraph:
- Typo: "only supports reading"
- Typo: "from snapshots"
- Change "snapshots whose type of Data Operations ..." to "append snapshots" because it is much shorter
- As a separate sentence, add that delete and overwrite cannot be processed and will cause an exception
- Then in the last sentence add that deletes can be ignored: "To ignore delete snapshots, add
streaming-skip-delete-snapshots=true"
Keep in mind that people reading the documentation probably don't know Iceberg internals. Referring to "Data Operations" is not very clear to most readers.
a5281ff to
2f11f8c
Compare
| ``` | ||
|
|
||
| !!! Note | ||
| Iceberg only supports reading data from APPEND snapshots. DELETE\OVERWRITE snapshots cannot be processed and will cause an exception. To ignore delete snapshots, add `streaming-skip-delete-snapshots=true` to option. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- There is no need to capitalize append, delete, and overwrite
- "DELETE\OVERWRITE" should be "Delete or overwrite" -- try to stick to plain language rather than shortcuts
- The last two sentences conflict with one another because the first says that a delete will cause an exception. Instead, say that overwrite snapshots will cause an exception and address delete handling in a separate sentence: "By default, delete snapshots will cause an exception, but deletes may be ignored by setting ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, PTAL.
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
| | Feature support | Spark 3.0| Spark 2.4 | Notes | | ||
| |--------------------------------------------------|----------|------------|------------------------------------------------| | ||
| | [DataFrame write](#writing-with-streaming-query) | ✔ | ✔ | | | ||
| | [DataFrame write](#writing-with-streaming-query) | ✔ | ✔ | | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What changed on this line? Can we roll that back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for alignment. Rolled back.
| ``` | ||
|
|
||
| !!! Note | ||
| Iceberg only supports reading data from append snapshots. Overwrite snapshots cannot be processed and will cause an exception, similarly, delete snapshots will cause an exception by default, but deletes may be ignored by setting `streaming-skip-delete-snapshots=true`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "overwrite" sentence should end after "cause an exception" and "similarly" should start a new sentence. There's a clean break there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete, | ||
| "Cannot process delete snapshot: %s", snapshot.snapshotId()); | ||
| "Cannot process delete snapshot: %s, to ignore deletes, set %s=true.", | ||
| snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks :)
…parkMicroBatchStream
* apache/iceberg#3723 * apache/iceberg#3732 * apache/iceberg#3749 * apache/iceberg#3766 * apache/iceberg#3787 * apache/iceberg#3796 * apache/iceberg#3809 * apache/iceberg#3820 * apache/iceberg#3878 * apache/iceberg#3890 * apache/iceberg#3892 * apache/iceberg#3944 * apache/iceberg#3976 * apache/iceberg#3993 * apache/iceberg#3996 * apache/iceberg#4008 * apache/iceberg#3758 and 3856 * apache/iceberg#3761 * apache/iceberg#2062 * apache/iceberg#3422 * remove restriction related to legacy parquet file list
close #3699