-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version: 2.1.0
apache-airflow-providers-amazon version: 1.4.0
Kubernetes version (if you are using kubernetes) (use kubectl version):
Environment:
- Cloud provider or hardware configuration: AWS EC2
- OS (e.g. from /etc/os-release): Amazon Linux 2
- Kernel (e.g.
uname -a): - Install tools:
- Others:
What happened:
Trying to integrate AWS SES SMTP service with Airflow to send emails on failures.
Below are details in my airflow.cfg file:
[email]
email_backend = airflow.providers.amazon.aws.utils.emailer.send_email
email_conn_id = aws_default
[smtp]
smtp_host = email-smtp.us-east-1.amazonaws.com
smtp_starttls = True
smtp_ssl = False
smtp_user = SMTP_USER_KEY
smtp_port = 587
smtp_password = SMTP_PASSWORD
smtp_mail_from = myemail@mycompany.com
My Test DAG:
args = {
'owner': 'Data Platform',
'depends_on_past': False,
'start_date': datetime(2018, 11, 12, 0, 0, 0, 0),
'email': [Variable.get('smtp_pd_email')],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=10),
}
dag_name = 'test_smtp'
dag = DAG(dag_id=dag_name,
default_args=args,
schedule_interval='0 8 * * *')
greet = BashOperator(
task_id='csv_to_s3',
bash_command="""
eccho "Hello world!"
""",
dag=dag)
What you expected to happen: With all the email and smtp configs and aws service setup, email should have been sent on DAG Failure.
But upon the DAG failure I was getting below error and no email was sent:
[2021-07-01 16:43:37,212] {taskinstance.py:1537} ERROR - Failed to send email to: ['airflow-critical-alert@company.com']
Traceback (most recent call last):
File "/data/environments/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/data/environments/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/data/environments/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/data/environments/airflow/lib/python3.7/site-packages/airflow/operators/bash.py", line 180, in execute
raise AirflowException('Bash command failed. The command returned a non-zero exit code.')
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/data/environments/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1878, in email_alert
send_email(self.task.email, subject, html_content)
File "/data/environments/airflow/lib/python3.7/site-packages/airflow/utils/email.py", line 66, in send_email
**kwargs,
File "/data/environments/airflow/lib/python3.7/site-packages/airflow/providers/amazon/aws/utils/emailer.py", line 48, in send_email
mime_charset=mime_charset,
File "/data/environments/airflow/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/ses.py", line 96, in send_email
Source=mail_from, Destinations=recipients, RawMessage={'Data': message.as_string()}
File "/data/environments/airflow/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/data/environments/airflow/lib/python3.7/site-packages/botocore/client.py", line 649, in _make_api_call
api_params, operation_model, context=request_context)
File "/data/environments/airflow/lib/python3.7/site-packages/botocore/client.py", line 697, in _convert_to_request_dict
api_params, operation_model)
File "/data/environments/airflow/lib/python3.7/site-packages/botocore/validate.py", line 297, in serialize_to_request
raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter Source, value: None, type: <class 'NoneType'>, valid types: <class 'str'>
While tracking back the error I found the issue is with airflow/providers/amazon/aws/utils/emailer.py(Airflow module for email backend using AWS SES - refered to as emailer.py below).
for send_email function in emailer.py. This function calls SESHook.send_email with parameter mail_from=None. This why getting ParamValidationError from botocore packages.
Updating emailer.py with call to SESHook.send_email as below worked for me.
def send_email(
to: Union[List[str], str],
subject: str,
html_content: str,
files: Optional[List] = None,
cc: Optional[Union[List[str], str]] = None,
bcc: Optional[Union[List[str], str]] = None,
mime_subtype: str = 'mixed',
mime_charset: str = 'utf-8',
conn_id: str = 'aws_default',
**kwargs,
) -> None:
"""Email backend for SES."""
hook = SESHook(aws_conn_id=conn_id)
hook.send_email(
mail_from= conf.get('smtp', 'smtp_mail_from'),
to=to,
subject=subject,
html_content=html_content,
files=files,
cc=cc,
bcc=bcc,
mime_subtype=mime_subtype,
mime_charset=mime_charset,
)
How to reproduce it:
- Create SES domain, verify email, generate SMTP credentials
- Add details to config provided above
- Might want to change smtp_host as per region you are looking for
- Run the test DAG (code provided above)
Anything else we need to know: