Skip to content

EmrCreateJobFlowOperator should not wait_for_completion if wait_for_completion=false deferrable=true #40966

@ibzx

Description

@ibzx

Apache Airflow version

2.9.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When setting global

AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE =true
EmrCreateJobFlowOperator(wait_for_completion=false)

The operator should adds a deferral trigger to wait for Jobflow create completion status.

What you think should happen instead?

Since we did not ask to wait for completion, the deferrable argument should do nothing. The deferrable argument should only start a trigger when wait_for_completion=true

How to reproduce

See deferrable code here:

if self.deferrable:
self.defer(
trigger=EmrCreateJobFlowTrigger(
job_flow_id=self._job_flow_id,
aws_conn_id=self.aws_conn_id,
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
),
method_name="execute_complete",
# timeout is set to ensure that if a trigger dies, the timeout does not restart
# 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent)
timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay + 60),
)
if self.wait_for_completion:
self._emr_hook.get_waiter("job_flow_waiting").wait(
ClusterId=self._job_flow_id,
WaiterConfig=prune_dict(
{
"Delay": self.waiter_delay,
"MaxAttempts": self.waiter_max_attempts,
}
),
)

Sample code

EmrCreateJobFlowOperator(
        task_id="cluster_m5",
        wait_for_completion=false,
        deferrable=true
)

Operating System

Mac OS 14.5 (23F79)

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions