diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py index ab0d7e4baf15d..ff3eeb3595763 100644 --- a/airflow/providers/airbyte/hooks/airbyte.py +++ b/airflow/providers/airbyte/hooks/airbyte.py @@ -68,8 +68,10 @@ def wait_for_job( try: job = self.get_job(job_id=(int(job_id))) state = job.json()["job"]["status"] - except AirflowException as err: - self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err) + except AirflowException: + self.log.info( + "Retrying. Airbyte API returned server error when waiting for job.", exc_info=True + ) continue if state in (self.RUNNING, self.PENDING, self.INCOMPLETE): diff --git a/airflow/providers/alibaba/cloud/hooks/oss.py b/airflow/providers/alibaba/cloud/hooks/oss.py index 08272adb25e70..e1692a5f09361 100644 --- a/airflow/providers/alibaba/cloud/hooks/oss.py +++ b/airflow/providers/alibaba/cloud/hooks/oss.py @@ -200,8 +200,8 @@ def download_file( """ try: self.get_bucket(bucket_name).get_object_to_file(key, local_file) - except Exception as e: - self.log.error(e) + except Exception: + self.log.exception("Unable to download file.") return None return local_file @@ -221,8 +221,8 @@ def delete_object( try: self.get_bucket(bucket_name).delete_object(key) except Exception as e: - self.log.error(e) - raise AirflowException(f"Errors when deleting: {key}") + self.log.error("Errors when deleting %s", key) + raise AirflowException(e) @provide_bucket_name @unify_bucket_name_and_key @@ -240,8 +240,8 @@ def delete_objects( try: self.get_bucket(bucket_name).batch_delete_objects(key) except Exception as e: - self.log.error(e) - raise AirflowException(f"Errors when deleting: {key}") + self.log.error("Errors when deleting %s", key) + raise AirflowException(e) @provide_bucket_name def delete_bucket( @@ -256,8 +256,8 @@ def delete_bucket( try: self.get_bucket(bucket_name).delete_bucket() except Exception as e: - self.log.error(e) - raise AirflowException(f"Errors when deleting: {bucket_name}") + self.log.error("Errors when deleting %s", bucket_name) + raise AirflowException(e) @provide_bucket_name def create_bucket( @@ -272,8 +272,8 @@ def create_bucket( try: self.get_bucket(bucket_name).create_bucket() except Exception as e: - self.log.error(e) - raise AirflowException(f"Errors when create bucket: {bucket_name}") + self.log.error("Errors when deleting %s", bucket_name) + raise AirflowException(e) @provide_bucket_name @unify_bucket_name_and_key @@ -290,8 +290,8 @@ def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: try: self.get_bucket(bucket_name).append_object(key, pos, content) except Exception as e: - self.log.error(e) - raise AirflowException(f"Errors when append string for object: {key}") + self.log.error("Errors when appending string for object %s", key) + raise AirflowException(e) @provide_bucket_name @unify_bucket_name_and_key @@ -306,8 +306,8 @@ def read_key(self, bucket_name: Optional[str], key: str) -> str: try: return self.get_bucket(bucket_name).get_object(key).read().decode("utf-8") except Exception as e: - self.log.error(e) - raise AirflowException(f"Errors when read bucket object: {key}") + self.log.error("Errors when reading bucket object %s", key) + raise AirflowException(e) @provide_bucket_name @unify_bucket_name_and_key @@ -322,8 +322,8 @@ def head_key(self, bucket_name: Optional[str], key: str) -> oss2.models.HeadObje try: return self.get_bucket(bucket_name).head_object(key) except Exception as e: - self.log.error(e) - raise AirflowException(f"Errors when head bucket object: {key}") + self.log.error("Errors when head bucket object %s", key) + raise AirflowException(e) @provide_bucket_name @unify_bucket_name_and_key @@ -339,8 +339,8 @@ def key_exist(self, bucket_name: Optional[str], key: str) -> bool: try: return self.get_bucket(bucket_name).object_exists(key) except Exception as e: - self.log.error(e) - raise AirflowException(f"Errors when check bucket object existence: {key}") + self.log.error("Errors when checking bucket object existence, %s", key) + raise AirflowException(e) def get_credential(self) -> oss2.auth.Auth: extra_config = self.oss_conn.extra_dejson diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py index d404a47e4a5b4..7d6e81acd4b7d 100644 --- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -48,9 +48,8 @@ def hook(self): self.log.info("remote_conn_id: %s", remote_conn_id) try: return OSSHook(oss_conn_id=remote_conn_id) - except Exception as e: - self.log.error(e, exc_info=True) - self.log.error( + except Exception: + self.log.exception( 'Could not create an OSSHook with connection id "%s". ' 'Please make sure that airflow[oss] is installed and ' 'the OSS connection exists.', diff --git a/airflow/providers/amazon/aws/hooks/athena.py b/airflow/providers/amazon/aws/hooks/athena.py index 82e69ccf6f001..2744b6d3f36be 100644 --- a/airflow/providers/amazon/aws/hooks/athena.py +++ b/airflow/providers/amazon/aws/hooks/athena.py @@ -104,8 +104,8 @@ def check_query_status(self, query_execution_id: str) -> Optional[str]: state = None try: state = response['QueryExecution']['Status']['State'] - except Exception as ex: - self.log.error('Exception while getting query state %s', ex) + except Exception: + self.log.exception('Exception while getting query state.') finally: # The error is being absorbed here and is being handled by the caller. # The error is being absorbed to implement retries. @@ -122,8 +122,8 @@ def get_state_change_reason(self, query_execution_id: str) -> Optional[str]: reason = None try: reason = response['QueryExecution']['Status']['StateChangeReason'] - except Exception as ex: - self.log.error('Exception while getting query state change reason: %s', ex) + except Exception: + self.log.exception('Exception while getting query state change reason.') finally: # The error is being absorbed here and is being handled by the caller. # The error is being absorbed to implement retries. diff --git a/airflow/providers/amazon/aws/hooks/emr.py b/airflow/providers/amazon/aws/hooks/emr.py index 2141b38ed8fc4..68c1b29d4986f 100644 --- a/airflow/providers/amazon/aws/hooks/emr.py +++ b/airflow/providers/amazon/aws/hooks/emr.py @@ -195,8 +195,8 @@ def get_job_failure_reason(self, job_id: str) -> Optional[str]: reason = f"{failure_reason} - {state_details}" except KeyError: self.log.error('Could not get status of the EMR on EKS job') - except ClientError as ex: - self.log.error('AWS request failed, check logs for more info: %s', ex) + except ClientError: + self.log.exception('AWS request failed, check logs for more info.') return reason @@ -216,9 +216,9 @@ def check_query_status(self, job_id: str) -> Optional[str]: except self.conn.exceptions.ResourceNotFoundException: # If the job is not found, we raise an exception as something fatal has happened. raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}') - except ClientError as ex: + except ClientError: # If we receive a generic ClientError, we swallow the exception so that the - self.log.error('AWS request failed, check logs for more info: %s', ex) + self.log.exception('AWS request failed, check logs for more info.') return None def poll_query_status( diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py index 9201b9f70b99a..0f05479c4a2a8 100644 --- a/airflow/providers/amazon/aws/hooks/glue.py +++ b/airflow/providers/amazon/aws/hooks/glue.py @@ -107,8 +107,8 @@ def get_iam_execution_role(self) -> Dict: glue_execution_role = iam_client.get_role(RoleName=self.role_name) self.log.info("Iam Role Name: %s", self.role_name) return glue_execution_role - except Exception as general_error: - self.log.error("Failed to create aws glue job, error: %s", general_error) + except Exception: + self.log.error("Failed to create aws glue job.") raise def initialize_job( @@ -129,8 +129,8 @@ def initialize_job( job_name = self.get_or_create_glue_job() return glue_client.start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs) - except Exception as general_error: - self.log.error("Failed to run aws glue job, error: %s", general_error) + except Exception: + self.log.error("Failed to run aws glue job.") raise def get_job_state(self, job_name: str, run_id: str) -> str: @@ -280,8 +280,8 @@ def get_or_create_glue_job(self) -> str: **self.create_job_kwargs, ) return create_job_response['Name'] - except Exception as general_error: - self.log.error("Failed to create aws glue job, error: %s", general_error) + except Exception: + self.log.error("Failed to create aws glue job.") raise diff --git a/airflow/providers/amazon/aws/hooks/glue_catalog.py b/airflow/providers/amazon/aws/hooks/glue_catalog.py index e77916d09e3b9..dfb068d4cf878 100644 --- a/airflow/providers/amazon/aws/hooks/glue_catalog.py +++ b/airflow/providers/amazon/aws/hooks/glue_catalog.py @@ -149,8 +149,8 @@ def get_partition(self, database_name: str, table_name: str, partition_values: L DatabaseName=database_name, TableName=table_name, PartitionValues=partition_values ) return response["Partition"] - except ClientError as e: - self.log.error("Client error: %s", e) + except ClientError: + self.log.error("Client error.") raise AirflowException("AWS request failed, check logs for more info") def create_partition(self, database_name: str, table_name: str, partition_input: Dict) -> Dict: @@ -175,8 +175,8 @@ def create_partition(self, database_name: str, table_name: str, partition_input: return self.get_conn().create_partition( DatabaseName=database_name, TableName=table_name, PartitionInput=partition_input ) - except ClientError as e: - self.log.error("Client error: %s", e) + except ClientError: + self.log.error("Client error.") raise AirflowException("AWS request failed, check logs for more info") diff --git a/airflow/providers/amazon/aws/hooks/quicksight.py b/airflow/providers/amazon/aws/hooks/quicksight.py index 2058661d0a2bf..7e060038d8154 100644 --- a/airflow/providers/amazon/aws/hooks/quicksight.py +++ b/airflow/providers/amazon/aws/hooks/quicksight.py @@ -87,8 +87,8 @@ def create_ingestion( check_interval=check_interval, ) return create_ingestion_response - except Exception as general_error: - self.log.error("Failed to run Amazon QuickSight create_ingestion API, error: %s", general_error) + except Exception: + self.log.error("Failed to run Amazon QuickSight create_ingestion API.") raise def get_status(self, aws_account_id: str, data_set_id: str, ingestion_id: str): diff --git a/airflow/providers/amazon/aws/hooks/s3.py b/airflow/providers/amazon/aws/hooks/s3.py index 9c46b78685c80..bf3f15e1e4f24 100644 --- a/airflow/providers/amazon/aws/hooks/s3.py +++ b/airflow/providers/amazon/aws/hooks/s3.py @@ -934,9 +934,9 @@ def get_bucket_tagging(self, bucket_name: Optional[str] = None) -> Optional[List result = s3_client.get_bucket_tagging(Bucket=bucket_name)['TagSet'] self.log.info("S3 Bucket Tag Info: %s", result) return result - except ClientError as e: - self.log.error(e) - raise e + except ClientError: + self.log.error("Unable to retrieve tags.") + raise @provide_bucket_name def put_bucket_tagging( @@ -969,9 +969,9 @@ def put_bucket_tagging( try: s3_client = self.get_conn() s3_client.put_bucket_tagging(Bucket=bucket_name, Tagging={'TagSet': tag_set}) - except ClientError as e: - self.log.error(e) - raise e + except ClientError: + self.log.error("Unable to apply tag(s) to bucket.") + raise @provide_bucket_name def delete_bucket_tagging(self, bucket_name: Optional[str] = None) -> None: diff --git a/airflow/providers/amazon/aws/hooks/sagemaker.py b/airflow/providers/amazon/aws/hooks/sagemaker.py index 2c8c28a738ec3..c47fa8e680140 100644 --- a/airflow/providers/amazon/aws/hooks/sagemaker.py +++ b/airflow/providers/amazon/aws/hooks/sagemaker.py @@ -945,6 +945,6 @@ def delete_model(self, model_name: str): """ try: self.get_conn().delete_model(ModelName=model_name) - except Exception as general_error: - self.log.error("Failed to delete model, error: %s", general_error) + except Exception: + self.log.error("Failed to delete model.") raise diff --git a/airflow/providers/amazon/aws/hooks/sts.py b/airflow/providers/amazon/aws/hooks/sts.py index 78ecad74d9f54..9a186c471bfdf 100644 --- a/airflow/providers/amazon/aws/hooks/sts.py +++ b/airflow/providers/amazon/aws/hooks/sts.py @@ -35,6 +35,6 @@ def get_account_number(self) -> str: """Get the account Number""" try: return self.get_conn().get_caller_identity()['Account'] - except Exception as general_error: - self.log.error("Failed to get the AWS Account Number, error: %s", general_error) + except Exception: + self.log.error("Failed to get the AWS Account Number.") raise diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index c13ebd87e8dd6..31ad19d278c6c 100644 --- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -55,13 +55,12 @@ def hook(self): from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook return AwsLogsHook(aws_conn_id=remote_conn_id, region_name=self.region_name) - except Exception as e: - self.log.error( + except Exception: + self.log.exception( 'Could not create an AwsLogsHook with connection id "%s". ' 'Please make sure that apache-airflow[aws] is installed and ' - 'the Cloudwatch logs connection exists. Exception: "%s"', + 'the Cloudwatch logs connection exists.', remote_conn_id, - e, ) return None diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index ae12b2ca2f209..a73cf29025747 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -48,13 +48,12 @@ def hook(self): from airflow.providers.amazon.aws.hooks.s3 import S3Hook return S3Hook(remote_conn_id, transfer_config_args={"use_threads": False}) - except Exception as e: + except Exception: self.log.exception( 'Could not create an S3Hook with connection id "%s". ' 'Please make sure that apache-airflow[aws] is installed and ' - 'the S3 connection exists. Exception : "%s"', + 'the S3 connection exists.', remote_conn_id, - e, ) return None diff --git a/airflow/providers/amazon/aws/operators/athena.py b/airflow/providers/amazon/aws/operators/athena.py index b3e3f90889aad..99a0b7ca1f6b2 100644 --- a/airflow/providers/amazon/aws/operators/athena.py +++ b/airflow/providers/amazon/aws/operators/athena.py @@ -121,8 +121,8 @@ def on_kill(self) -> None: http_status_code = None try: http_status_code = response['ResponseMetadata']['HTTPStatusCode'] - except Exception as ex: - self.log.error('Exception while cancelling query: %s', ex) + except Exception: + self.log.exception('Exception while cancelling query.') finally: if http_status_code is None or http_status_code != 200: self.log.error('Unable to request query cancel on athena. Exiting') diff --git a/airflow/providers/amazon/aws/operators/datasync.py b/airflow/providers/amazon/aws/operators/datasync.py index ffe864d5ced2d..d819cf4e61d4c 100644 --- a/airflow/providers/amazon/aws/operators/datasync.py +++ b/airflow/providers/amazon/aws/operators/datasync.py @@ -356,8 +356,8 @@ def _execute_datasync_task(self) -> None: self.log.info("Waiting for TaskExecutionArn %s", self.task_execution_arn) try: result = hook.wait_for_task_execution(self.task_execution_arn, max_iterations=self.max_iterations) - except (AirflowTaskTimeout, AirflowException) as e: - self.log.error('Cancelling TaskExecution after Exception: %s', e) + except (AirflowTaskTimeout, AirflowException): + self.log.error('Cancelling TaskExecution after Exception') self._cancel_datasync_task_execution() raise self.log.info("Completed TaskExecutionArn %s", self.task_execution_arn) diff --git a/airflow/providers/amazon/aws/operators/ecs.py b/airflow/providers/amazon/aws/operators/ecs.py index 7967fe0d139ef..fba2ccf2ce673 100644 --- a/airflow/providers/amazon/aws/operators/ecs.py +++ b/airflow/providers/amazon/aws/operators/ecs.py @@ -136,11 +136,11 @@ def _get_log_events(self, skip: int = 0) -> Generator: yield from self.hook.get_log_events(self.log_group, self.log_stream_name, skip=skip) except ClientError as error: if error.response['Error']['Code'] != 'ResourceNotFoundException': - self.logger.warning('Error on retrieving Cloudwatch log events', error) + self.logger.warning('Error on retrieving Cloudwatch log events', exc_info=True) yield from () - except ConnectionClosedError as error: - self.logger.warning('ConnectionClosedError on retrieving Cloudwatch log events', error) + except ConnectionClosedError: + self.logger.warning('ConnectionClosedError on retrieving Cloudwatch log events', exc_info=True) yield from () def _event_to_str(self, event: dict) -> str: diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index 8e97015d86182..b8a6106f639ee 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -40,9 +40,10 @@ DEFAULT_POD_NAME = 'pod' ABORT_MSG = "{compute} are still active after the allocated time limit. Aborting." -CAN_NOT_DELETE_MSG = "A cluster can not be deleted with attached {compute}. Deleting {count} {compute}." +CAN_NOT_DELETE_MSG = "A cluster can not be deleted with attached %s. Deleting %d %s." MISSING_ARN_MSG = "Creating an {compute} requires {requirement} to be passed in." -SUCCESS_MSG = "No {compute} remain, deleting cluster." +# SUCCESS_MSG = "No {compute} remain, deleting cluster." +SUCCESS_MSG = "No %s remain, deleting cluster." SUPPORTED_COMPUTE_VALUES = frozenset({'nodegroup', 'fargate'}) NODEGROUP_FULL_NAME = 'Amazon EKS managed node groups' @@ -438,7 +439,7 @@ def delete_any_nodegroups(self, eks_hook) -> None: """ nodegroups = eks_hook.list_nodegroups(clusterName=self.cluster_name) if nodegroups: - self.log.info(CAN_NOT_DELETE_MSG.format(compute=NODEGROUP_FULL_NAME, count=len(nodegroups))) + self.log.info(CAN_NOT_DELETE_MSG, NODEGROUP_FULL_NAME, len(nodegroups), NODEGROUP_FULL_NAME) for group in nodegroups: eks_hook.delete_nodegroup(clusterName=self.cluster_name, nodegroupName=group) @@ -457,7 +458,7 @@ def delete_any_nodegroups(self, eks_hook) -> None: ) else: raise RuntimeError(ABORT_MSG.format(compute=NODEGROUP_FULL_NAME)) - self.log.info(SUCCESS_MSG.format(compute=NODEGROUP_FULL_NAME)) + self.log.info(SUCCESS_MSG, NODEGROUP_FULL_NAME) def delete_any_fargate_profiles(self, eks_hook) -> None: """ @@ -468,7 +469,7 @@ def delete_any_fargate_profiles(self, eks_hook) -> None: """ fargate_profiles = eks_hook.list_fargate_profiles(clusterName=self.cluster_name) if fargate_profiles: - self.log.info(CAN_NOT_DELETE_MSG.format(compute=FARGATE_FULL_NAME, count=len(fargate_profiles))) + self.log.info(CAN_NOT_DELETE_MSG, FARGATE_FULL_NAME, len(fargate_profiles), FARGATE_FULL_NAME) for profile in fargate_profiles: # The API will return a (cluster) ResourceInUseException if you try # to delete Fargate profiles in parallel the way we can with nodegroups, @@ -493,7 +494,7 @@ def delete_any_fargate_profiles(self, eks_hook) -> None: ) else: raise RuntimeError(ABORT_MSG.format(compute=FARGATE_FULL_NAME)) - self.log.info(SUCCESS_MSG.format(compute=FARGATE_FULL_NAME)) + self.log.info(SUCCESS_MSG, FARGATE_FULL_NAME) class EksDeleteNodegroupOperator(BaseOperator): diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 4e8a1d96c9895..bdba608cc7d41 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -228,8 +228,8 @@ def on_kill(self) -> None: http_status_code = None try: http_status_code = response["ResponseMetadata"]["HTTPStatusCode"] - except Exception as ex: - self.log.error("Exception while cancelling query: %s", ex) + except Exception: + self.log.exception("Exception while cancelling query:") finally: if http_status_code is None or http_status_code != 200: self.log.error("Unable to request query cancel on EMR. Exiting") diff --git a/airflow/providers/amazon/aws/operators/redshift_data.py b/airflow/providers/amazon/aws/operators/redshift_data.py index a2400f94bdbfa..e59ac5d8c3bc8 100644 --- a/airflow/providers/amazon/aws/operators/redshift_data.py +++ b/airflow/providers/amazon/aws/operators/redshift_data.py @@ -153,5 +153,5 @@ def on_kill(self) -> None: try: self.hook.conn.cancel_statement(Id=self.statement_id) - except Exception as ex: - self.log.error('Unable to cancel query. Exiting. %s', ex) + except Exception: + self.log.exception('Unable to cancel query. Exiting.') diff --git a/airflow/providers/apache/hdfs/hooks/webhdfs.py b/airflow/providers/apache/hdfs/hooks/webhdfs.py index dc27a80237d58..712450894057e 100644 --- a/airflow/providers/apache/hdfs/hooks/webhdfs.py +++ b/airflow/providers/apache/hdfs/hooks/webhdfs.py @@ -90,8 +90,8 @@ def _find_valid_server(self) -> Any: return client else: self.log.warning("Could not connect to %s:%s", namenode, connection.port) - except HdfsError as hdfs_error: - self.log.info('Read operation on namenode %s failed with error: %s', namenode, hdfs_error) + except HdfsError: + self.log.info('Read operation on namenode %s failed.', namenode, exc_info=True) return None def _get_client( diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index ef2ebed6ae0ef..e3fc26ac90e40 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -661,5 +661,5 @@ def __exit__(self, exctype, excinst, exctb): if caught_error: self.exception = excinst logger = logging.getLogger(__name__) - logger.error(str(excinst), exc_info=True) + logger.exception(str(excinst)) return caught_error diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py index 15ac40bcdb90a..4d18fc951fc86 100644 --- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py @@ -85,13 +85,12 @@ def _log_driver(self, application_state: str, response: dict) -> None: for line in self.hook.get_pod_logs(driver_pod_name, namespace=namespace): log += line.decode() log_method(log) - except client.rest.ApiException as e: + except client.rest.ApiException: self.log.warning( "Could not read logs for pod %s. It may have been disposed.\n" - "Make sure timeToLiveSeconds is set on your SparkApplication spec.\n" - "underlying exception: %s", + "Make sure timeToLiveSeconds is set on your SparkApplication spec.", driver_pod_name, - e, + exc_info=True, ) def poke(self, context: 'Context') -> bool: diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 52b7113a5904d..b6d55976af5d4 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -224,11 +224,11 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) line = raw_line.decode('utf-8', errors="backslashreplace") timestamp, message = self.parse_log_line(line) self.log.info(message) - except BaseHTTPError as e: + except BaseHTTPError: self.log.warning( "Reading of logs interrupted with error %r; will retry. " "Set log level to DEBUG for traceback.", - e, + exc_info=True, ) self.log.debug( "Traceback for interrupted logs read for pod %r", diff --git a/airflow/providers/docker/hooks/docker.py b/airflow/providers/docker/hooks/docker.py index c5c339c85fd5a..d056991df15ad 100644 --- a/airflow/providers/docker/hooks/docker.py +++ b/airflow/providers/docker/hooks/docker.py @@ -107,5 +107,5 @@ def __login(self, client) -> None: ) self.log.debug('Login successful') except APIError as docker_error: - self.log.error('Docker login failed: %s', str(docker_error)) - raise AirflowException(f'Docker login failed: {docker_error}') + self.log.error('Docker login failed.') + raise AirflowException(docker_error) diff --git a/airflow/providers/ftp/sensors/ftp.py b/airflow/providers/ftp/sensors/ftp.py index faa9c5c315af1..8d67a5f0cbab1 100644 --- a/airflow/providers/ftp/sensors/ftp.py +++ b/airflow/providers/ftp/sensors/ftp.py @@ -74,7 +74,7 @@ def poke(self, context: 'Context') -> bool: self.log.info('Found File %s last modified: %s', str(self.path), str(mod_time)) except ftplib.error_perm as e: - self.log.error('Ftp error encountered: %s', str(e)) + self.log.exception('Ftp error encountered.') error_code = self._get_error_code(e) if (error_code != 550) and ( self.fail_on_transient_errors or (error_code not in self.transient_errors) diff --git a/airflow/providers/google/ads/hooks/ads.py b/airflow/providers/google/ads/hooks/ads.py index 993a4970df2cb..5a426e2911c07 100644 --- a/airflow/providers/google/ads/hooks/ads.py +++ b/airflow/providers/google/ads/hooks/ads.py @@ -164,8 +164,8 @@ def _get_client(self) -> GoogleAdsClient: try: client = GoogleAdsClient.load_from_dict(self.google_ads_config) return client - except GoogleAuthError as e: - self.log.error("Google Auth Error: %s", e) + except GoogleAuthError: + self.log.error("Google Auth Error!") raise @cached_property @@ -177,8 +177,8 @@ def _get_customer_service(self) -> CustomerServiceClient: try: client = GoogleAdsClient.load_from_dict(self.google_ads_config) return client.get_service("CustomerService", version=self.api_version) - except GoogleAuthError as e: - self.log.error("Google Auth Error: %s", e) + except GoogleAuthError: + self.log.error("Google Auth Error!") raise def _get_config(self) -> None: diff --git a/airflow/providers/google/ads/transfers/ads_to_gcs.py b/airflow/providers/google/ads/transfers/ads_to_gcs.py index ffce93940c0d8..4f9d9343f790e 100644 --- a/airflow/providers/google/ads/transfers/ads_to_gcs.py +++ b/airflow/providers/google/ads/transfers/ads_to_gcs.py @@ -111,8 +111,8 @@ def execute(self, context: 'Context') -> None: try: getter = attrgetter(*self.attributes) converted_rows = [getter(row) for row in rows] - except Exception as e: - self.log.error("An error occurred in converting the Google Ad Rows. \n Error %s", e) + except Exception: + self.log.error("An error occurred in converting the Google Ad Rows.") raise with NamedTemporaryFile("w", suffix=".csv") as csvfile: diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 0049143aea461..b0729bbb2a541 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -168,13 +168,13 @@ def get_sqlalchemy_engine(self, engine_kwargs=None): # ADC uses the service account that is attached to the resource that is running your code. return create_engine(self.get_uri(), **engine_kwargs) except Exception as e: - self.log.error(e) - raise AirflowException( + self.log.error( "For now, we only support instantiating SQLAlchemy engine by" " using ADC" ", extra__google_cloud_platform__key_path" "and extra__google_cloud_platform__keyfile_dict" ) + raise AirflowException(e) def get_records(self, sql, parameters=None): if self.location is None: diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py index d870d80726669..b85c895903036 100644 --- a/airflow/providers/google/cloud/hooks/dataproc.py +++ b/airflow/providers/google/cloud/hooks/dataproc.py @@ -689,8 +689,10 @@ def wait_for_job( try: job = self.get_job(project_id=project_id, region=region, job_id=job_id) state = job.status.state - except ServerError as err: - self.log.info("Retrying. Dataproc API returned server error when waiting for job: %s", err) + except ServerError: + self.log.info( + "Retrying. Dataproc API returned server error when waiting for job.", exc_info=True + ) if state == JobStatus.State.ERROR: raise AirflowException(f'Job failed:\n{job}') diff --git a/airflow/providers/google/cloud/hooks/mlengine.py b/airflow/providers/google/cloud/hooks/mlengine.py index b1b6e83918aa4..188f33179f10a 100644 --- a/airflow/providers/google/cloud/hooks/mlengine.py +++ b/airflow/providers/google/cloud/hooks/mlengine.py @@ -139,7 +139,7 @@ def create_job(self, job: dict, project_id: str, use_existing_job_fn: Optional[C raise self.log.info('Job with job_id %s already exist. Will waiting for it to finish', job_id) else: - self.log.error('Failed to create MLEngine job: %s', e) + self.log.error('Failed to create MLEngine job.') raise return self._wait_for_job_done(project_id, job_id) @@ -176,7 +176,7 @@ def cancel_job( self.log.info('Job with job_id %s is already complete, cancellation aborted.', job_id) return {} else: - self.log.error('Failed to cancel MLEngine job: %s', e) + self.log.error('Failed to cancel MLEngine job.') raise def _get_job(self, project_id: str, job_id: str) -> dict: @@ -201,7 +201,7 @@ def _get_job(self, project_id: str, job_id: str) -> dict: # polling after 30 seconds when quota failure occurs time.sleep(30) else: - self.log.error('Failed to get MLEngine job: %s', e) + self.log.error('Failed to get MLEngine job.') raise def _wait_for_job_done(self, project_id: str, job_id: str, interval: int = 30): @@ -293,8 +293,8 @@ def set_default_version( response = request.execute(num_retries=self.num_retries) self.log.info('Successfully set version: %s to default', response) return response - except HttpError as e: - self.log.error('Something went wrong: %s', e) + except HttpError: + self.log.error('Something went wrong!') raise @GoogleBaseHook.fallback_to_default_project_id @@ -441,7 +441,7 @@ def get_model( return request.execute(num_retries=self.num_retries) except HttpError as e: if e.resp.status == 404: - self.log.error('Model was not found: %s', e) + self.log.exception('Model was not found.') return None raise @@ -475,7 +475,7 @@ def delete_model( request.execute(num_retries=self.num_retries) except HttpError as e: if e.resp.status == 404: - self.log.error('Model was not found: %s', e) + self.log.exception('Model was not found.') return raise diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index 0b4c0e09f27d8..b94cddb0a1fbf 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -169,10 +169,10 @@ def gcs_write(self, log, remote_log_location): except Exception as e: if not hasattr(e, 'resp') or e.resp.get('status') != '404': log = f'*** Previous log discarded: {str(e)}\n\n' + log - self.log.info("Previous log discarded: %s", e) + self.log.info("Previous log discarded.", exc_info=True) try: blob = storage.Blob.from_string(remote_log_location, self.client) blob.upload_from_string(log, content_type="text/plain") - except Exception as e: - self.log.error('Could not write logs to %s: %s', remote_log_location, e) + except Exception: + self.log.exception('Could not write logs to %s', remote_log_location) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 827f82f5f6b3f..f365c438a474a 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2176,7 +2176,7 @@ def execute(self, context: Any): ) try: - self.log.info("Executing: %s'", self.configuration) + self.log.info("Executing: %s", self.configuration) job = self._submit_job(hook, job_id) self._handle_job_error(job) except Conflict: diff --git a/airflow/providers/google/cloud/operators/translate.py b/airflow/providers/google/cloud/operators/translate.py index 40bd4dda6e981..64ca91836e2d6 100644 --- a/airflow/providers/google/cloud/operators/translate.py +++ b/airflow/providers/google/cloud/operators/translate.py @@ -127,6 +127,5 @@ def execute(self, context: 'Context') -> dict: self.log.debug("Translation %s", translation) return translation except ValueError as e: - self.log.error('An error has been thrown from translate method:') - self.log.error(e) + self.log.error('An error has been thrown from translate method.') raise AirflowException(e) diff --git a/airflow/providers/google/cloud/operators/translate_speech.py b/airflow/providers/google/cloud/operators/translate_speech.py index 7b2ef448c3d13..8a82f9d53c4bd 100644 --- a/airflow/providers/google/cloud/operators/translate_speech.py +++ b/airflow/providers/google/cloud/operators/translate_speech.py @@ -183,5 +183,4 @@ def execute(self, context: 'Context') -> dict: return translation except ValueError as e: self.log.error('An error has been thrown from translate speech method:') - self.log.error(e) raise AirflowException(e) diff --git a/airflow/providers/google/cloud/sensors/dataproc.py b/airflow/providers/google/cloud/sensors/dataproc.py index 02b2d5e14d7ab..b393f973dc800 100644 --- a/airflow/providers/google/cloud/sensors/dataproc.py +++ b/airflow/providers/google/cloud/sensors/dataproc.py @@ -78,7 +78,7 @@ def poke(self, context: "Context") -> bool: job = hook.get_job( job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id ) - except ServerError as err: + except ServerError: duration = self._duration() self.log.info("DURATION RUN: %f", duration) if duration > self.wait_timeout: @@ -86,7 +86,9 @@ def poke(self, context: "Context") -> bool: f"Timeout: dataproc job {self.dataproc_job_id} " f"is not ready after {self.wait_timeout}s" ) - self.log.info("Retrying. Dataproc API returned server error when waiting for job: %s", err) + self.log.info( + "Retrying. Dataproc API returned server error when waiting for job.", exc_info=True + ) return False else: job = hook.get_job(job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id) diff --git a/airflow/providers/google/common/hooks/base_google.py b/airflow/providers/google/common/hooks/base_google.py index 4c4ab5c888c6f..a31a2e7c6f4a1 100644 --- a/airflow/providers/google/common/hooks/base_google.py +++ b/airflow/providers/google/common/hooks/base_google.py @@ -291,12 +291,12 @@ def _get_credentials_email(self) -> str: if isinstance(credentials, compute_engine.Credentials): try: credentials.refresh(_http_client.Request()) - except RefreshError as msg: + except RefreshError: """ If the Compute Engine metadata service can't be reached in this case the instance has not credentials. """ - self.log.debug(msg) + self.log.debug("Refreshing the credentials' access token has failed.", exc_info=True) service_account_email = getattr(credentials, 'service_account_email', None) if service_account_email: diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index 373cd866c02ad..eb6b0635d6d1b 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -203,7 +203,7 @@ def run_and_check( return response except requests.exceptions.ConnectionError as ex: - self.log.warning('%s Tenacity will retry to execute the operation', ex) + self.log.warning('Tenacity will retry to execute the operation.') raise ex def run_with_advanced_retry(self, _retry_args: Dict[Any, Any], *args: Any, **kwargs: Any) -> Any: diff --git a/airflow/providers/jira/sensors/jira.py b/airflow/providers/jira/sensors/jira.py index 3a2cb8edddcef..06493852cc5cd 100644 --- a/airflow/providers/jira/sensors/jira.py +++ b/airflow/providers/jira/sensors/jira.py @@ -124,8 +124,8 @@ def issue_field_checker(self, issue: Issue) -> Optional[bool]: self.field, ) - except JIRAError as jira_error: - self.log.error("Jira error while checking with expected value: %s", jira_error) + except JIRAError: + self.log.exception("Jira error while checking with expected value.") except Exception: self.log.exception("Error while checking with expected value %s:", self.expected_value) if result is True: diff --git a/airflow/providers/microsoft/azure/hooks/wasb.py b/airflow/providers/microsoft/azure/hooks/wasb.py index ad8cf2e7a0400..d36ffd48295df 100644 --- a/airflow/providers/microsoft/azure/hooks/wasb.py +++ b/airflow/providers/microsoft/azure/hooks/wasb.py @@ -382,8 +382,8 @@ def create_container(self, container_name: str) -> None: e.response, self.conn_id, ) - except Exception as e: - self.log.info('Error while attempting to create container %r: %s', container_name, e) + except Exception: + self.log.info('Error while attempting to create container %r.', container_name) raise def delete_container(self, container_name: str) -> None: diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index 47a6e824bc50d..00396d4cb85a0 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -139,8 +139,8 @@ def wasb_log_exists(self, remote_log_location: str) -> bool: try: return self.hook.check_for_blob(self.wasb_container, remote_log_location) - except Exception as e: - self.log.debug('Exception when trying to check remote location: "%s"', e) + except Exception: + self.log.debug('Exception when trying to check remote location.', exc_info=True) return False def wasb_read(self, remote_log_location: str, return_error: bool = False): diff --git a/airflow/providers/microsoft/azure/secrets/key_vault.py b/airflow/providers/microsoft/azure/secrets/key_vault.py index 19eacf010c009..5e8f72987f003 100644 --- a/airflow/providers/microsoft/azure/secrets/key_vault.py +++ b/airflow/providers/microsoft/azure/secrets/key_vault.py @@ -181,6 +181,6 @@ def _get_secret(self, path_prefix: str, secret_id: str) -> Optional[str]: try: secret = self.client.get_secret(name=name) return secret.value - except ResourceNotFoundError as ex: - self.log.debug('Secret %s not found: %s', name, ex) + except ResourceNotFoundError: + self.log.debug('Secret %s not found.', name, exc_info=True) return None diff --git a/airflow/providers/opsgenie/hooks/opsgenie.py b/airflow/providers/opsgenie/hooks/opsgenie.py index 602ca4e7a5cae..ba4c384b8a8f1 100644 --- a/airflow/providers/opsgenie/hooks/opsgenie.py +++ b/airflow/providers/opsgenie/hooks/opsgenie.py @@ -159,6 +159,6 @@ def delete_alert( source=source, ) return api_response - except OpenApiException as e: - self.log.exception('Exception when calling AlertApi->delete_alert: %s\n', e) - raise e + except OpenApiException: + self.log.error('Exception when calling AlertApi->delete_alert:') + raise diff --git a/airflow/providers/qubole/sensors/qubole.py b/airflow/providers/qubole/sensors/qubole.py index 1d1fdb62eda41..882eb497d4b05 100644 --- a/airflow/providers/qubole/sensors/qubole.py +++ b/airflow/providers/qubole/sensors/qubole.py @@ -57,8 +57,10 @@ def poke(self, context: 'Context') -> bool: status = False try: status = self.sensor_class.check(self.data) # type: ignore[attr-defined] - except Exception as e: - self.log.exception(e) + except Exception: + self.log.exception( + "Exception occurred while poking for status. Will attempt to poke for status again later." + ) status = False self.log.info('Status of this Poke: %s', status) diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py index 8b23218c8cc39..30905df51c146 100644 --- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py @@ -18,7 +18,7 @@ import time from datetime import datetime as dt from unittest import mock -from unittest.mock import ANY, call +from unittest.mock import call import pytest from watchtower import CloudWatchLogHandler @@ -114,9 +114,9 @@ def test_hook_raises(self): mock_error.assert_called_once_with( 'Could not create an AwsLogsHook with connection id "%s". Please make ' 'sure that apache-airflow[aws] is installed and the Cloudwatch ' - 'logs connection exists. Exception: "%s"', + 'logs connection exists.', 'aws_default', - ANY, + exc_info=True, ) def test_handler(self): diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py b/tests/providers/amazon/aws/log/test_s3_task_handler.py index 958ded562edfc..a0f1e50a3af52 100644 --- a/tests/providers/amazon/aws/log/test_s3_task_handler.py +++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py @@ -19,7 +19,6 @@ import contextlib import os from unittest import mock -from unittest.mock import ANY import pytest from botocore.exceptions import ClientError @@ -107,9 +106,8 @@ def test_hook_raises(self): mock_error.assert_called_once_with( 'Could not create an S3Hook with connection id "%s". Please make ' - 'sure that apache-airflow[aws] is installed and the S3 connection exists. Exception : "%s"', + 'sure that apache-airflow[aws] is installed and the S3 connection exists.', 'aws_default', - ANY, exc_info=True, ) diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index 88e0eeb07e1c0..13e57d4ecdfaf 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -895,9 +895,9 @@ def test_patch_core_settings(self, key, value, attr, patched_value): def test__suppress(): - with mock.patch('logging.Logger.error') as mock_error: + with mock.patch('logging.Logger.exception') as mock_exception: with _suppress(ValueError): raise ValueError("failure") - mock_error.assert_called_once_with("failure", exc_info=True) + mock_exception.assert_called_once_with("failure") diff --git a/tests/providers/google/cloud/hooks/test_mlengine.py b/tests/providers/google/cloud/hooks/test_mlengine.py index f06a89701dc0c..4202b2c53ccc7 100644 --- a/tests/providers/google/cloud/hooks/test_mlengine.py +++ b/tests/providers/google/cloud/hooks/test_mlengine.py @@ -470,7 +470,7 @@ def test_delete_model_when_not_exists(self, mock_get_conn, mock_log): mock.call().projects().models().delete().execute(num_retries=5), ] ) - mock_log.error.assert_called_once_with('Model was not found: %s', http_error) + mock_log.exception.assert_called_once_with('Model was not found.') @mock.patch("airflow.providers.google.cloud.hooks.mlengine.time.sleep") @mock.patch("airflow.providers.google.cloud.hooks.mlengine.MLEngineHook.get_conn") diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index b443a9f8ec8ef..194f500ba7949 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -172,7 +172,7 @@ def test_failed_write_to_remote_on_close(self, mock_blob, mock_client, mock_cred ( "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler", logging.ERROR, - "Could not write logs to gs://bucket/remote/log/location/1.log: Failed to connect", + "Could not write logs to gs://bucket/remote/log/location/1.log", ), ] mock_blob.assert_has_calls(