Skip to content

Conversation

@liferoad
Copy link
Contributor

The AppliedPTransform initializer would unconditionally attempt to call get_resource_hints() on a transform object. This could cause an AttributeError if a PTransform implementation does not define this method.

This change adds an hasattr check to verify the existence of the get_resource_hints method before calling it, preventing the potential crash and making the pipeline construction more robust.

Addresses #33854 (comment)


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

The AppliedPTransform initializer would unconditionally attempt to call
`get_resource_hints()` on a transform object. This could cause an
AttributeError if a PTransform implementation does not define this method.

This change adds an `hasattr` check to verify the existence of the
`get_resource_hints` method before calling it, preventing the potential
crash and making the pipeline construction more robust.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @liferoad, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request improves the stability of the Apache Beam Python SDK by addressing a specific AttributeError that could arise during pipeline construction. It ensures that resource hint retrieval logic gracefully handles PTransform implementations that do not explicitly define a get_resource_hints method, thereby preventing crashes and making the system more resilient.

Highlights

  • Bug Fix: Resolved an AttributeError that occurred in the AppliedPTransform initializer when a transform object lacked a get_resource_hints() method.
  • Robustness Improvement: Introduced an hasattr check to verify the existence of the get_resource_hints method before attempting to call it, preventing potential crashes.
  • Pipeline Stability: Enhanced the robustness of pipeline construction in the Python SDK by handling PTransform implementations that do not define resource hints.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@GaetanLepage
Copy link

After applying #36238 and this one, I get:

      self.side_inputs = (
          tuple() if transform is None else getattr(
              transform, 'side_inputs', tuple()))
      self.outputs = {}  # type: Dict[Union[str, int, None], pvalue.PValue]
      self.parts = []  # type: List[AppliedPTransform]
      self.environment_id = environment_id if environment_id else None  # type: Optional[str]
      # We may need to merge the hints with environment-provided hints here
      # once environment is a first-class citizen in Beam graph and we have
      # access to actual environment, not just an id.
      self.resource_hints = dict(
          transform.get_resource_hints()) if transform and hasattr(
              transform, 'get_resource_hints') else {}  # type: Dict[str, bytes]

      if transform:
        annotations = {
>           **(annotations or {}), **encode_annotations(transform.annotations())
                                                        ^^^^^^^^^^^^^^^^^^^^^
        }
E       AttributeError: 'MaybeReshuffle' object has no attribute 'annotations'

apache_beam/pipeline.py:1244: AttributeError

@liferoad
Copy link
Contributor Author

Thanks. Updated annotations.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@GaetanLepage
Copy link

One more apparently:

      # Ordering is important here.
      # TODO(https://github.com/apache/beam/issues/20136): use key, value pairs
      # instead of depending on tags with index as a suffix.
      indexed_side_inputs = [
          (get_sideinput_index(tag), context.pcollections.get_by_id(id))
          for tag, id in proto.inputs.items() if tag in side_input_tags
      ]
      side_inputs = [si for _, si in sorted(indexed_side_inputs)]

      result = AppliedPTransform(
          parent=None,
          transform=transform,
          full_label=proto.unique_name,
          main_inputs=main_inputs,
          environment_id=None,
          annotations=proto.annotations)

>     if result.transform and result.transform.side_inputs:
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E     AttributeError: 'MaybeReshuffle' object has no attribute 'side_inputs'

apache_beam/pipeline.py:1526: AttributeError

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @damccorm for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@liferoad
Copy link
Contributor Author

I did more checks. Please help validate this. Thanks.

@GaetanLepage
Copy link

I did more checks. Please help validate this. Thanks.

self = <apache_beam.pipeline.Pipeline.to_runner_api.<locals>.ForceKvInputTypes object at 0x7ffddbed8ec0>
transform_node = AppliedPTransform(assert_that/Create/MaybeReshuffle, MaybeReshuffle)

    def visit_transform(self, transform_node):
      # type: (AppliedPTransform) -> None
      if not transform_node.transform:
        return
>     if transform_node.transform.runner_api_requires_keyed_input():
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E     AttributeError: 'MaybeReshuffle' object has no attribute 'runner_api_requires_keyed_input'

apache_beam/pipeline.py:1026: AttributeError

@liferoad
Copy link
Contributor Author

I did more checks. Please help validate this. Thanks.

self = <apache_beam.pipeline.Pipeline.to_runner_api.<locals>.ForceKvInputTypes object at 0x7ffddbed8ec0>
transform_node = AppliedPTransform(assert_that/Create/MaybeReshuffle, MaybeReshuffle)

    def visit_transform(self, transform_node):
      # type: (AppliedPTransform) -> None
      if not transform_node.transform:
        return
>     if transform_node.transform.runner_api_requires_keyed_input():
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E     AttributeError: 'MaybeReshuffle' object has no attribute 'runner_api_requires_keyed_input'

apache_beam/pipeline.py:1026: AttributeError

Added one check for that.

@GaetanLepage
Copy link

      def transform_to_runner_api(
          transform,  # type: Optional[ptransform.PTransform]
          context  # type: PipelineContext
      ):
        # type: (...) -> Optional[beam_runner_api_pb2.FunctionSpec]
        if transform is None:
          return None
        else:
          # We only populate inputs information to ParDo in order to expose
          # key_coder and window_coder to stateful DoFn.
          if isinstance(transform, ParDo):
            return transform.to_runner_api(
                context,
                has_parts=bool(self.parts),
                named_inputs=self.named_inputs())
          return transform.to_runner_api(context, has_parts=bool(self.parts))

      # Iterate over inputs and outputs by sorted key order, so that ids are
      # consistently generated for multiple runs of the same pipeline.
      try:
        transform_spec = transform_to_runner_api(self.transform, context)
      except Exception as exn:
>       raise RuntimeError(f'Unable to translate {self.full_label}') from exn
E       RuntimeError: Unable to translate assert_that/Create/MaybeReshuffle

@codecov
Copy link

codecov bot commented Sep 24, 2025

Codecov Report

❌ Patch coverage is 76.92308% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 40.17%. Comparing base (bbf3613) to head (a3d05eb).
⚠️ Report is 58 commits behind head on master.

Files with missing lines Patch % Lines
sdks/python/apache_beam/pipeline.py 76.92% 3 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #36251       +/-   ##
=============================================
- Coverage     56.84%   40.17%   -16.67%     
  Complexity     3386     3386               
=============================================
  Files          1220     1220               
  Lines        185898   185904        +6     
  Branches       3523     3523               
=============================================
- Hits         105672    74688    -30984     
- Misses        76885   107875    +30990     
  Partials       3341     3341               
Flag Coverage Δ
python 40.53% <76.92%> (-40.40%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@liferoad
Copy link
Contributor Author

      def transform_to_runner_api(
          transform,  # type: Optional[ptransform.PTransform]
          context  # type: PipelineContext
      ):
        # type: (...) -> Optional[beam_runner_api_pb2.FunctionSpec]
        if transform is None:
          return None
        else:
          # We only populate inputs information to ParDo in order to expose
          # key_coder and window_coder to stateful DoFn.
          if isinstance(transform, ParDo):
            return transform.to_runner_api(
                context,
                has_parts=bool(self.parts),
                named_inputs=self.named_inputs())
          return transform.to_runner_api(context, has_parts=bool(self.parts))

      # Iterate over inputs and outputs by sorted key order, so that ids are
      # consistently generated for multiple runs of the same pipeline.
      try:
        transform_spec = transform_to_runner_api(self.transform, context)
      except Exception as exn:
>       raise RuntimeError(f'Unable to translate {self.full_label}') from exn
E       RuntimeError: Unable to translate assert_that/Create/MaybeReshuffle

Updated the PR to capture this.

@GaetanLepage
Copy link

self = AppliedPTransform(assert_that/Create/MaybeReshuffle, MaybeReshuffle)

    def named_inputs(self):
      # type: () -> Dict[str, pvalue.PValue]
      if self.transform is None:
        assert not self.main_inputs and not self.side_inputs
        return {}
      else:
>       named_inputs = self.transform._named_inputs(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
            self.main_inputs, self.side_inputs)
E       AttributeError: 'MaybeReshuffle' object has no attribute '_named_inputs'

apache_beam/pipeline.py:1400: AttributeError

@liferoad
Copy link
Contributor Author

self = AppliedPTransform(assert_that/Create/MaybeReshuffle, MaybeReshuffle)

    def named_inputs(self):
      # type: () -> Dict[str, pvalue.PValue]
      if self.transform is None:
        assert not self.main_inputs and not self.side_inputs
        return {}
      else:
>       named_inputs = self.transform._named_inputs(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
            self.main_inputs, self.side_inputs)
E       AttributeError: 'MaybeReshuffle' object has no attribute '_named_inputs'

apache_beam/pipeline.py:1400: AttributeError

Pushed one more fix for this. :)

@GaetanLepage
Copy link

Thanks! The test passes now!

return
replacement_transform.side_inputs = tuple(
original_transform_node.transform.side_inputs)
getattr(original_transform_node.transform, 'side_inputs', ()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than doing all of these attribute checks, can we just set these property to empty values when we initialize the object?

def __init__(self, label=None):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is not to set these property. It is caused by the nested MaybeReshuffle. Any fix in MaybeReshuffle could cause the update-compatibly issue. That is why we did #36238

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow - are you saying adding these properties to the PTransform class would cause update incompatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check https://github.com/apache/beam/pull/36184/files#r2359516983: MaybeReshuffle is defined dynamically (inside Create.expand?) which is affecting the inheritance.

The fields should be there if MaybeReshuffle was not nested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, chimed in on that thread. I think we should fix the core label issue which is causing this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this fix is much better since it can handle other nested transforms.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So would updating

def __init__(self, label=None):

The reason https://github.com/apache/beam/pull/36184/files#r2359516983 was breaking is because:

  1. There were transforms which didn't have explicit labels
  2. Those transforms get autoassigned names which include the line number. For example Map(<lambda at bigquery_file_loads.py:1157>) in https://github.com/apache/beam/pull/34807/files
  3. When we change the file, the line number that those transforms land on is no longer the same

So if we:

  1. Explicitly name the transform which is getting assigned a name with a line number
  2. Add these properties to
    def __init__(self, label=None):

Then we should fix this issue while avoiding any breaking changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what you mean here. The nested transform misses many fields (check the rest of my PR), which are not needed when the transform is nested. My PR can make sure any future nested transform should work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested transform misses many fields (check the rest of my PR), which are not needed when the transform is nested. My PR can make sure any future nested transform should work.

I agree your PR works. But it is quite messy - for example, we check for the existence of a side_inputs property 3 times when the object is always a PTransform object. It seems much cleaner to just guarantee that this property will always exist on PTransform objects. This also means that if we use these properties elsewhere (now or in the future), we don't need to do more of these kinds of checks.

It seem reasonable to me that PTransform should have these fields in all cases. An alternative would be PTransform providing some functions to get these properties if they exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless, this is a minor code quality issue and not a correctness one. It doesn't need to block the PR if you disagree.

return
replacement_transform.side_inputs = tuple(
original_transform_node.transform.side_inputs)
getattr(original_transform_node.transform, 'side_inputs', ()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested transform misses many fields (check the rest of my PR), which are not needed when the transform is nested. My PR can make sure any future nested transform should work.

I agree your PR works. But it is quite messy - for example, we check for the existence of a side_inputs property 3 times when the object is always a PTransform object. It seems much cleaner to just guarantee that this property will always exist on PTransform objects. This also means that if we use these properties elsewhere (now or in the future), we don't need to do more of these kinds of checks.

It seem reasonable to me that PTransform should have these fields in all cases. An alternative would be PTransform providing some functions to get these properties if they exist.

return
replacement_transform.side_inputs = tuple(
original_transform_node.transform.side_inputs)
getattr(original_transform_node.transform, 'side_inputs', ()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless, this is a minor code quality issue and not a correctness one. It doesn't need to block the PR if you disagree.

@liferoad
Copy link
Contributor Author

liferoad commented Oct 3, 2025

I am going to merge this for now since this covers the needs for #33854 (comment)

@liferoad liferoad merged commit 1e97363 into apache:master Oct 3, 2025
160 of 176 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants