-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Pass update compat through as_deterministic_coder and use cloudpickle for deterministic special types. #35725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass update compat through as_deterministic_coder and use cloudpickle for deterministic special types. #35725
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @claudevdm, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a new update_compatibility_version parameter to the as_deterministic_coder method across the Apache Beam Python SDK's coder hierarchy. This change allows for the propagation of compatibility version information during the conversion of coders to their deterministic forms, which is crucial for maintaining serialization compatibility across different versions of Beam or user-defined types. The modification ensures that this compatibility context is available throughout the coder conversion process, particularly for complex and nested coder structures.
Highlights
- API Change: The
as_deterministic_codermethod across variousCoderimplementations now accepts an optionalupdate_compatibility_versionparameter. This parameter is intended to provide context for compatibility during the conversion process. - Parameter Propagation: The newly introduced
update_compatibility_versionparameter is propagated through nestedas_deterministic_codercalls within composite coders such asMapCoder,NullableCoder,TupleCoder,TupleSequenceCoder, andIterableCoder. - RowCoder Integration: The
RowCoder's constructor (__init__) and itsas_deterministic_codermethod have been updated to accept and utilize theupdate_compatibility_versionparameter, ensuring it's passed down to its component coders when forcing determinism.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request aims to propagate an update_compatibility_version parameter through as_deterministic_coder methods. The changes are applied to many Coder subclasses.
While the parameter is correctly passed down in several composite coders, there are a few issues:
- In
row_coder.py, there's a syntax error in the definition ofRowCoder.as_deterministic_coderthat will prevent the code from running. - In
coders.py, severalas_deterministic_coderimplementations have been updated to accept the new parameter, but they don't use it when creating the new deterministic coder instance. This makes the change incomplete for those coders.
8112f51 to
a6d8607
Compare
6e9be45 to
4597045
Compare
a19eeff to
51e1a59
Compare
|
R: @tvalentyn |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
damccorm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This generally LGTM, I'll defer to @tvalentyn here though
2728c60 to
73b97c4
Compare
73b97c4 to
13ab3c0
Compare
This PR refactors how "special types" are deterministically encoded in Beam Python, moving from
dilltocloudpicklewhile preserving update compatibility.Background:
Previously,
FastPrimitivesCoderuseddillto deterministically encode certain "special types" (e.g.,NamedTuple,Enum,dataclasses). However,PipelineOptions, and specificallyupdate_compatibility_version, were not available during the creation of these deterministic coders. This posed a challenge for maintaining update compatibility across Beam SDK versions.Key Changes and Motivation:
Make
update_compatibility_versionavailable toFastPrimitivesCoder.as_deterministic_coder:update_compatibility_versionfromPipelineOptionsneeds to be accessible when a deterministic coder is constructed.update_compatibility_versionas a variable to the coder registry during pipeline constructionFastPrimitivesCoder.as_deterministic_coderlooks upregistry.update_compatibility_versionwhen deciding what detministic coder to useTransitioning to
cloudpicklefor Special Type Encoding:FastPrimitivesCoderImplfromdilltocloudpickle.force_use_dillparameter is introduced toFastPrimitivesCoderImpland its Cythonized counterpart, allowing control over which pickler is used._verify_dill_compatis added to enforcedill==0.3.1.1whenupdate_compatibility_version=2.67.0is specified, ensuring backward compatibility with older pipelines.encode_type_2_67_0and_unpickle_type_2_67_0methods are introduced to handle thedill-based encoding/decoding for compatibility with versions<=2.67.0.encode_typeanddecode_typemethods inFastPrimitivesCoderImplnow usecloudpickle_pickler.dumpsandcloudpickle_pickler.loadsby default, falling back todillifforce_use_dillis true.Introducing
DeterministicFastPrimitivesCoderV2for Compatibility Checks:<=2.67.0to>2.67.0(ifupdate_compatibility_versionis not explicitly set), a new coderDeterministicFastPrimitivesCoderV2is introduced.update_compatibility_versionflag._update_compatible_deterministic_fast_primitives_coderhelper function is used byFastPrimitivesCoder.as_deterministic_coderto return eitherDeterministicFastPrimitivesCoder(fordill-compatible versions) orDeterministicFastPrimitivesCoderV2(forcloudpickle-based versions).Helper Function for Version Comparison (
is_v1_prior_to_v2):is_v1_prior_to_v2is added toapache_beam.transforms.utilto simplify version comparison logic, replacing a duplicated pattern.Testing:
test_deterministic_coder,test_deterministic_map_coder_is_update_compatible, etc.) have been added tocoders_test_common.pyto verify the behavior of deterministic coders with and withoutupdate_compatibility_version.test_cross_process_encoding_of_special_types_is_deterministictest has been updated to be parameterized bycompat_versionand now explicitly tests thedillfallback for2.67.0and earlier.test_group_by_key_importable_special_typestest confirms GroupByKey functionality with special types and update compatibility versions.test_group_by_key_dynamic_special_typestest demonstrates support for dynamic types by explicitly usingCloudpickleCoderas a fallback.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.