Skip to content

Conversation

@hussein-awala
Copy link
Member

closes: #29903


Currently the BaseOperator overrides the mapped tasks partial arguments by DAG default args when they are provided regardless the value of partial arguments, but by definition, Airflow should uses these default args only when the operator argument is not provided.

This PR fills the partial_kwargs dictionary with the arguments and kwargs of the partial method, and then just replaces the None values with the default arguments values.

@hussein-awala hussein-awala requested a review from uranusjr as a code owner March 4, 2023 01:15
@hussein-awala hussein-awala added this to the Airflow 2.5.2 milestone Mar 4, 2023
@potiuk
Copy link
Member

potiuk commented Mar 4, 2023

cc: @uranusjr . This definitely needs your insight :)

@uranusjr
Copy link
Member

May be related: #29366

Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a commit to tweak the merging part. Should not affect the functionality, I think, but please take another look to make sure.

This should improve iteration a bit, I think.
@uranusjr uranusjr force-pushed the fix/mapped_tasks_args branch from 819b262 to cd142b6 Compare April 13, 2023 06:46
@hussein-awala hussein-awala force-pushed the fix/mapped_tasks_args branch from bb92fe4 to 7a59ec7 Compare April 14, 2023 11:00
@potiuk potiuk modified the milestones: Airflow 2.6.1, Airflow 2.6.0 Apr 14, 2023
@hussein-awala hussein-awala merged commit f01051a into apache:main Apr 14, 2023
ephraimbuddy pushed a commit that referenced this pull request Apr 14, 2023
…#29913)

* Add a failing test to make it pass

* use partial_kwargs when they are provide and override only None values by dag default values

* update the test and check if the values are filled in the right order

* fix overriding retry_delay with default value when it is equal to 0

* add missing default value for inlets and outlets

* set partial_kwargs dict type to dict[str, Any] and remove type ignore comments

* create a dict for default values and use NotSet instead of None to support None as accepted value

* update partial typing by removing None type from some args and set NotSet for all args

* Tweak kwarg merging slightly

This should improve iteration a bit, I think.

* Fix unit tests

---------

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
(cherry picked from commit f01051a)
@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Apr 14, 2023
wookiist pushed a commit to wookiist/airflow that referenced this pull request Apr 19, 2023
…apache#29913)

* Add a failing test to make it pass

* use partial_kwargs when they are provide and override only None values by dag default values

* update the test and check if the values are filled in the right order

* fix overriding retry_delay with default value when it is equal to 0

* add missing default value for inlets and outlets

* set partial_kwargs dict type to dict[str, Any] and remove type ignore comments

* create a dict for default values and use NotSet instead of None to support None as accepted value

* update partial typing by removing None type from some args and set NotSet for all args

* Tweak kwarg merging slightly

This should improve iteration a bit, I think.

* Fix unit tests

---------

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
@mpgreg
Copy link

mpgreg commented Aug 3, 2023

Is there any reason this fix wouldn't work with branch decorator also?

I'm on Astronomer Runtime 8.8.0 based on Airflow 2.6.3+astro.2 and branch to dynamic tasks

@task.branch
    def check_object_count(doc_count:dict) -> str:
        response = blah
        
        if response 
            return None
        else:
            return ["get_md_docs", "get_rst_docs", "get_code_samples", "get_stack_overflow"]

@task(trigger_rule='none_failed')
    def get_md_docs(source:dict):
         return blah
@task(trigger_rule='none_failed')
    def get_rst_docs(source:dict):
         return blah
@task(trigger_rule='none_failed')
    def get_code_samples(source:dict):
         return blah
@task(trigger_rule='none_failed')
    def get_stack_overflow(source:dict):
         return blah

md_docs = get_md_docs.partial().expand(source=markdown_docs_sources)
rst_docs = get_rst_docs.partial().expand(source=rst_docs_sources)
code_samples = get_code_samples.partial().expand(source=code_samples_sources)
stackoverflow_md = get_stack_overflow.partial().expand(tag=stackoverflow_tags)

All downstream mapped tasks fail with:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1407, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1531, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2179, in render_templates
    original_task.render_template_fields(context)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 692, in render_template_fields
    unmapped_task = self.unmap(mapped_kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 615, in unmap
    op = self.operator_class(**kwargs, _airflow_from_mapped=True)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/usr/local/lib/python3.10/site-packages/airflow/decorators/python.py", line 49, in __init__
    super().__init__(
  File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/usr/local/lib/python3.10/site-packages/airflow/decorators/base.py", line 212, in __init__
    super().__init__(task_id=task_id, **kwargs_to_upstream, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/usr/local/lib/python3.10/site-packages/airflow/operators/python.py", line 166, in __init__
    super().__init__(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 788, in __init__
    raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to _PythonDecoratedOperator (task_id: get_md_docs__1). Invalid arguments were:
**kwargs: {'postgres_conn_id': 'postgres_myid'}

@mpgreg
Copy link

mpgreg commented Aug 3, 2023

If I remove the default_args it runs okay.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Task-level retries overrides from the DAG-level default args are not respected when using partial

6 participants