From 0b6eb8fd9cdfd52f8f477d7e6188d08789ea9aa5 Mon Sep 17 00:00:00 2001 From: howardyoo Date: Thu, 15 Aug 2024 09:12:04 -0500 Subject: [PATCH 1/2] Fix for issue #39336 --- airflow/executors/base_executor.py | 2 +- airflow/executors/local_executor.py | 2 +- airflow/executors/sequential_executor.py | 2 +- airflow/jobs/scheduler_job_runner.py | 2 +- airflow/traces/utils.py | 6 +----- 5 files changed, 5 insertions(+), 9 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 2c5cbc4b574e7..4042377cfaab2 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -467,7 +467,7 @@ def success(self, key: TaskInstanceKey, info=None) -> None: span.set_attribute("dag_id", key.dag_id) span.set_attribute("run_id", key.run_id) span.set_attribute("task_id", key.task_id) - span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("try_number", key.try_number) self.change_state(key, TaskInstanceState.SUCCESS, info) diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 81b8ad76288b4..f28e525ec3ac5 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -277,7 +277,7 @@ def execute_async( span.set_attribute("dag_id", key.dag_id) span.set_attribute("run_id", key.run_id) span.set_attribute("task_id", key.task_id) - span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("try_number", key.try_number) span.set_attribute("commands_to_run", str(command)) local_worker = LocalWorker(self.executor.result_queue, key=key, command=command) diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 98fc901a132a2..0b4cbdea9dd42 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -76,7 +76,7 @@ def execute_async( span.set_attribute("dag_id", key.dag_id) span.set_attribute("run_id", key.run_id) span.set_attribute("task_id", key.task_id) - span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("try_number", key.try_number) span.set_attribute("commands_to_run", str(self.commands_to_run)) def sync(self) -> None: diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 44938c91b1f80..497e1d045e0b6 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -837,7 +837,7 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) -> span.set_attribute("hostname", ti.hostname) span.set_attribute("log_url", ti.log_url) span.set_attribute("operator", str(ti.operator)) - span.set_attribute("try_number", ti.try_number - 1) + span.set_attribute("try_number", ti.try_number) span.set_attribute("executor_state", state) span.set_attribute("job_id", ti.job_id) span.set_attribute("pool", ti.pool) diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py index afab2591d5146..8fe8a32aea106 100644 --- a/airflow/traces/utils.py +++ b/airflow/traces/utils.py @@ -75,12 +75,8 @@ def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int: def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int: """Generate span id from the task instance.""" dag_run = ti.dag_run - if ti.state == TaskInstanceState.SUCCESS or ti.state == TaskInstanceState.FAILED: - try_number = ti.try_number - 1 - else: - try_number = ti.try_number return _gen_id( - [dag_run.dag_id, dag_run.run_id, ti.task_id, str(try_number)], + [dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)], as_int, SPAN_ID, ) From 8e2fd289e6f4894ba9dc3a2525023283e3fb3d0c Mon Sep 17 00:00:00 2001 From: howardyoo Date: Thu, 15 Aug 2024 10:04:06 -0500 Subject: [PATCH 2/2] removed unnecessary import --- airflow/traces/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py index 8fe8a32aea106..9932c249f0772 100644 --- a/airflow/traces/utils.py +++ b/airflow/traces/utils.py @@ -22,7 +22,6 @@ from airflow.traces import NO_TRACE_ID from airflow.utils.hashlib_wrapper import md5 -from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: from airflow.models import DagRun, TaskInstance