From 0bec1e7d4cd16e11febec56513ed01ffd2a2b379 Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Fri, 21 Jun 2024 16:39:17 +0200 Subject: [PATCH] fix: provide stack trace under proper key in OL facet Signed-off-by: Kacper Muda --- airflow/providers/openlineage/plugins/adapter.py | 12 +++++++++--- tests/providers/openlineage/plugins/test_adapter.py | 4 ++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index 339ad55fdd2ce..348f83ee6a193 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -295,11 +295,17 @@ def fail_task( """ error_facet = {} if error: - if isinstance(error, BaseException): + stack_trace = None + if isinstance(error, BaseException) and error.__traceback__: import traceback - error = "\\n".join(traceback.format_exception(type(error), error, error.__traceback__)) - error_facet = {"errorMessage": ErrorMessageRunFacet(message=error, programmingLanguage="python")} + stack_trace = "\\n".join(traceback.format_exception(type(error), error, error.__traceback__)) + + error_facet = { + "errorMessage": ErrorMessageRunFacet( + message=str(error), programmingLanguage="python", stackTrace=stack_trace + ) + } event = RunEvent( eventType=RunState.FAIL, diff --git a/tests/providers/openlineage/plugins/test_adapter.py b/tests/providers/openlineage/plugins/test_adapter.py index 1242bb3ef98b0..d548e03a91be9 100644 --- a/tests/providers/openlineage/plugins/test_adapter.py +++ b/tests/providers/openlineage/plugins/test_adapter.py @@ -457,6 +457,7 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta run_facets={"externalQuery": ExternalQueryRunFacet(externalQueryId="123", source="source")}, job_facets={"sql": SqlJobFacet(query="SELECT 1;")}, ), + error=ValueError("Error message"), ) assert ( @@ -476,6 +477,9 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta job={"namespace": namespace(), "name": "parent_job_name"}, ), "externalQuery": ExternalQueryRunFacet(externalQueryId="123", source="source"), + "errorMessage": ErrorMessageRunFacet( + message="Error message", programmingLanguage="python", stackTrace=None + ), }, ), job=Job(