diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 773552184f103..65900276271a6 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -965,12 +965,14 @@ def __init__( category=RemovedInAirflow3Warning, stacklevel=3, ) - validate_key(task_id) dag = dag or DagContext.get_current_dag() task_group = task_group or TaskGroupContext.get_current_task_group(dag) self.task_id = task_group.child_id(task_id) if task_group else task_id + + validate_key(self.task_id) + if not self.__from_mapped and task_group: task_group.add(self) diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index 48aaf2699b918..8ce9ca195e913 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -564,6 +564,33 @@ def test_chain(self): assert [op2] == tgop3.get_direct_relatives(upstream=False) assert [op2] == tgop4.get_direct_relatives(upstream=False) + def test_baseoperator_raises_exception_when_task_id_plus_taskgroup_id_exceeds_250_chars(self): + """Test exception is raised when operator task id + taskgroup id > 250 chars.""" + dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now()) + + tg1 = TaskGroup("A" * 20, dag=dag) + with pytest.raises(AirflowException, match="The key has to be less than 250 characters"): + BaseOperator(task_id="1" * 250, task_group=tg1, dag=dag) + + def test_baseoperator_with_task_id_and_taskgroup_id_less_than_250_chars(self): + """Test exception is not raised when operator task id + taskgroup id < 250 chars.""" + dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now()) + + tg1 = TaskGroup("A" * 10, dag=dag) + try: + BaseOperator(task_id="1" * 239, task_group=tg1, dag=dag) + except Exception as e: + pytest.fail(f"Exception raised: {e}") + + def test_baseoperator_with_task_id_less_than_250_chars(self): + """Test exception is not raised when operator task id < 250 chars.""" + dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now()) + + try: + BaseOperator(task_id="1" * 249, dag=dag) + except Exception as e: + pytest.fail(f"Exception raised: {e}") + def test_chain_linear(self): dag = DAG(dag_id="test_chain_linear", schedule=None, start_date=datetime.now())