diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index 9b1f64d970e64..aaaee72a25df1 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -732,7 +732,7 @@ def __init__( f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). " f"Invalid arguments were:\n**kwargs: {kwargs}", ) - validate_key(task_id) + validate_key(self.task_id) self.owner = owner self.email = email diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index e95866d95a5e9..2c598edc777ac 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -395,6 +395,33 @@ def test_chain(self): assert [op2] == tgop3.get_direct_relatives(upstream=False) assert [op2] == tgop4.get_direct_relatives(upstream=False) + def test_baseoperator_raises_exception_when_task_id_plus_taskgroup_id_exceeds_250_chars(self): + """Test exception is raised when operator task id + taskgroup id > 250 chars.""" + dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now()) + + tg1 = TaskGroup("A" * 20, dag=dag) + with pytest.raises(ValueError, match="The key has to be less than 250 characters"): + BaseOperator(task_id="1" * 250, task_group=tg1, dag=dag) + + def test_baseoperator_with_task_id_and_taskgroup_id_less_than_250_chars(self): + """Test exception is not raised when operator task id + taskgroup id < 250 chars.""" + dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now()) + + tg1 = TaskGroup("A" * 10, dag=dag) + try: + BaseOperator(task_id="1" * 239, task_group=tg1, dag=dag) + except Exception as e: + pytest.fail(f"Exception raised: {e}") + + def test_baseoperator_with_task_id_less_than_250_chars(self): + """Test exception is not raised when operator task id < 250 chars.""" + dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now()) + + try: + BaseOperator(task_id="1" * 249, dag=dag) + except Exception as e: + pytest.fail(f"Exception raised: {e}") + def test_chain_linear(self): dag = DAG(dag_id="test_chain_linear", schedule=None, start_date=datetime.now())