-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Use base AWS classes in Glue Trigger / Sensor and implement custom waiter #52243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b6aca49
5dee231
7d69f8f
81f2501
444f1c7
97677a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,7 +60,6 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]): | |
| :param script_args: etl script arguments and AWS Glue arguments (templated) | ||
| :param retry_limit: The maximum number of times to retry this job if it fails | ||
| :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job. | ||
| :param region_name: aws region name (example: us-east-1) | ||
| :param s3_bucket: S3 bucket where logs and local etl script will be uploaded | ||
| :param iam_role_name: AWS IAM Role for Glue Job Execution. If set `iam_role_arn` must equal None. | ||
| :param iam_role_arn: AWS IAM ARN for Glue Job Execution. If set `iam_role_name` must equal None. | ||
|
|
@@ -79,6 +78,17 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]): | |
| Thus if status is returned immediately it might end up in case of more than 1 concurrent run. | ||
| It is recommended to set this parameter to 10 when you are using concurrency=1. | ||
| For more information see: https://repost.aws/questions/QUaKgpLBMPSGWO0iq2Fob_bw/glue-run-concurrent-jobs#ANFpCL2fRnQRqgDFuIU_rpvA | ||
|
|
||
| :param aws_conn_id: The Airflow connection used for AWS credentials. | ||
| If this is ``None`` or empty then the default boto3 behaviour is used. If | ||
| running Airflow in a distributed manner and aws_conn_id is None or | ||
| empty, then default boto3 configuration would be used (and must be | ||
| maintained on each worker node). | ||
| :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. | ||
| :param verify: Whether or not to verify SSL certificates. See: | ||
| https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html | ||
| :param botocore_config: Configuration dictionary (key-values) for botocore client. See: | ||
| https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html | ||
| """ | ||
|
|
||
| aws_hook_class = GlueJobHook | ||
|
|
@@ -122,9 +132,9 @@ def __init__( | |
| verbose: bool = False, | ||
| replace_script_file: bool = False, | ||
| update_config: bool = False, | ||
| job_poll_interval: int | float = 6, | ||
| stop_job_run_on_kill: bool = False, | ||
| sleep_before_return: int = 0, | ||
| job_poll_interval: int | float = 6, | ||
| **kwargs, | ||
| ): | ||
| super().__init__(**kwargs) | ||
|
|
@@ -231,7 +241,8 @@ def execute(self, context: Context): | |
| run_id=self._job_run_id, | ||
| verbose=self.verbose, | ||
| aws_conn_id=self.aws_conn_id, | ||
| job_poll_interval=self.job_poll_interval, | ||
| waiter_delay=int(self.job_poll_interval), | ||
| waiter_max_attempts=self.retry_limit, | ||
| ), | ||
| method_name="execute_complete", | ||
| ) | ||
|
|
@@ -254,7 +265,7 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None | |
|
|
||
| if validated_event["status"] != "success": | ||
| raise AirflowException(f"Error in glue job: {validated_event}") | ||
| return validated_event["value"] | ||
| return validated_event["run_id"] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did we make this change? What are the implications of changing the return value? Is this not a breaking change?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I renamed the key to make it more explicit, see here. That's similar as it is in other services, e.g with the I'm happy to revert it to value if preferred, but as long as the change is consistently applied where it's used, the renaming shouldn't introduce any issues? |
||
|
|
||
| def on_kill(self): | ||
| """Cancel the running AWS Glue Job.""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,6 @@ | |
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Sequence | ||
| from functools import cached_property | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| from airflow.configuration import conf | ||
|
|
@@ -28,16 +27,16 @@ | |
| from airflow.providers.amazon.aws.triggers.glue import ( | ||
| GlueDataQualityRuleRecommendationRunCompleteTrigger, | ||
| GlueDataQualityRuleSetEvaluationRunCompleteTrigger, | ||
| GlueJobCompleteTrigger, | ||
| ) | ||
| from airflow.providers.amazon.aws.utils import validate_execute_complete_event | ||
| from airflow.providers.amazon.aws.utils.mixins import aws_template_fields | ||
| from airflow.sensors.base import BaseSensorOperator | ||
|
|
||
| if TYPE_CHECKING: | ||
| from airflow.utils.context import Context | ||
|
|
||
|
|
||
| class GlueJobSensor(BaseSensorOperator): | ||
| class GlueJobSensor(AwsBaseSensor[GlueJobHook]): | ||
| """ | ||
| Waits for an AWS Glue Job to reach any of the status below. | ||
|
|
||
|
|
@@ -50,41 +49,86 @@ class GlueJobSensor(BaseSensorOperator): | |
| :param job_name: The AWS Glue Job unique name | ||
| :param run_id: The AWS Glue current running job identifier | ||
| :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False) | ||
| :param deferrable: If True, the sensor will operate in deferrable mode. This mode requires aiobotocore | ||
| module to be installed. | ||
| (default: False, but can be overridden in config file by setting default_deferrable to True) | ||
| :param poke_interval: Polling period in seconds to check for the status of the job. (default: 120) | ||
| :param max_retries: Number of times before returning the current state. (default: 60) | ||
|
Comment on lines
+55
to
+56
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means we'll wait for 2 hours. Is that a sane default?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The defaults you added in the Trigger are 60 - 75, any reason to not match that here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I followed the pattern used in other Glue sensors to stay consistent, as I was a bit uncertain what to use myself, but as you said probably better to stay with what's there. I can adjust that. |
||
|
|
||
| :param aws_conn_id: The Airflow connection used for AWS credentials. | ||
| If this is ``None`` or empty then the default boto3 behaviour is used. If | ||
| running Airflow in a distributed manner and aws_conn_id is None or | ||
| empty, then default boto3 configuration would be used (and must be | ||
| maintained on each worker node). | ||
| :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. | ||
| :param verify: Whether to verify SSL certificates. See: | ||
| https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html | ||
| :param botocore_config: Configuration dictionary (key-values) for botocore client. See: | ||
| https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html | ||
| """ | ||
|
|
||
| template_fields: Sequence[str] = ("job_name", "run_id") | ||
| SUCCESS_STATES = ("SUCCEEDED",) | ||
| FAILURE_STATES = ("FAILED", "STOPPED", "TIMEOUT") | ||
|
|
||
| aws_hook_class = GlueJobHook | ||
| template_fields: Sequence[str] = aws_template_fields("job_name", "run_id") | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| job_name: str, | ||
| run_id: str, | ||
| verbose: bool = False, | ||
| deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), | ||
| poke_interval: int = 120, | ||
| max_retries: int = 60, | ||
| aws_conn_id: str | None = "aws_default", | ||
| **kwargs, | ||
| ): | ||
| super().__init__(**kwargs) | ||
| self.job_name = job_name | ||
| self.run_id = run_id | ||
| self.verbose = verbose | ||
| self.deferrable = deferrable | ||
| self.poke_interval = poke_interval | ||
| self.max_retries = max_retries | ||
| self.aws_conn_id = aws_conn_id | ||
| self.success_states: list[str] = ["SUCCEEDED"] | ||
| self.errored_states: list[str] = ["FAILED", "STOPPED", "TIMEOUT"] | ||
| self.next_log_tokens = GlueJobHook.LogContinuationTokens() | ||
|
|
||
| @cached_property | ||
| def hook(self): | ||
| return GlueJobHook(aws_conn_id=self.aws_conn_id) | ||
| def execute(self, context: Context) -> Any: | ||
| if self.deferrable: | ||
| self.defer( | ||
| trigger=GlueJobCompleteTrigger( | ||
| job_name=self.job_name, | ||
| run_id=self.run_id, | ||
| verbose=self.verbose, | ||
| aws_conn_id=self.aws_conn_id, | ||
| waiter_delay=int(self.poke_interval), | ||
| waiter_max_attempts=self.max_retries, | ||
| ), | ||
| method_name="execute_complete", | ||
| ) | ||
| else: | ||
| super().execute(context=context) | ||
|
|
||
| def poke(self, context: Context): | ||
| def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None: | ||
| validated_event = validate_execute_complete_event(event) | ||
|
|
||
| if validated_event["status"] != "success": | ||
| message = f"Error: AWS Glue Job: {validated_event}" | ||
| raise AirflowException(message) | ||
|
|
||
| self.log.info("AWS Glue Job completed.") | ||
|
|
||
| def poke(self, context: Context) -> bool: | ||
| self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id) | ||
| job_state = self.hook.get_job_state(job_name=self.job_name, run_id=self.run_id) | ||
|
|
||
| try: | ||
| if job_state in self.success_states: | ||
| if job_state in self.SUCCESS_STATES: | ||
| self.log.info("Exiting Job %s Run State: %s", self.run_id, job_state) | ||
| return True | ||
| if job_state in self.errored_states: | ||
| if job_state in self.FAILURE_STATES: | ||
| job_error_message = "Exiting Job %s Run State: %s", self.run_id, job_state | ||
| self.log.info(job_error_message) | ||
| raise AirflowException(job_error_message) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.retry_limitis zero by default which means we don't ever attempt again and we'll fail immediately. This is causing our system tests to fail when run in deferrable mode: