From 345b8de512619fd742dc45fd985278224cb416ce Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Tue, 14 Mar 2023 20:00:47 -0700 Subject: [PATCH 1/4] Allow setting the DagRun state in create DAGRun API --- .../endpoints/dag_run_endpoint.py | 2 +- airflow/api_connexion/openapi/v1.yaml | 2 +- .../api_connexion/schemas/dag_run_schema.py | 2 +- .../endpoints/test_dag_run_endpoint.py | 36 ++++++++++++++----- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 945a5eccb769d..62199f52d35a0 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -338,7 +338,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: run_id=run_id, execution_date=logical_date, data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date), - state=DagRunState.QUEUED, + state=post_body.get("state"), conf=post_body.get("conf"), external_trigger=True, dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id), diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index b8515a00c9292..b7dde8a201afa 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2805,7 +2805,7 @@ components: - dataset_triggered state: $ref: '#/components/schemas/DagState' - readOnly: true + default: queued external_trigger: type: boolean default: true diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index d9cecb9b0b137..19b05ced7f835 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -66,7 +66,7 @@ class Meta: execution_date = auto_field(data_key="logical_date", validate=validate_istimezone) start_date = auto_field(dump_only=True) end_date = auto_field(dump_only=True) - state = DagStateField(dump_only=True) + state = DagStateField(dump_only=False) external_trigger = auto_field(dump_default=True, dump_only=True) conf = ConfObject() data_interval_start = auto_field(dump_only=True) diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 09ecb9e497142..c643786c5efcc 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -1034,14 +1034,19 @@ def test_end_date_gte_lte(self, payload, expected_dag_run_ids): class TestPostDagRun(TestDagRunEndpoint): @pytest.mark.parametrize("logical_date_field_name", ["execution_date", "logical_date"]) @pytest.mark.parametrize( - "dag_run_id, logical_date, note", + "dag_run_id, logical_date, note, state", [ - pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", id="all-present"), - pytest.param(None, "2020-06-11T18:00:00+00:00", None, id="only-date"), - pytest.param(None, None, None, id="all-missing"), + pytest.param( + "TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", "queued", id="all-present" + ), + pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, id="only-date"), + pytest.param(None, None, None, None, id="all-missing"), + pytest.param(None, None, None, "failed", id="create-failed"), ], ) - def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, logical_date, note): + def test_should_respond_200( + self, session, logical_date_field_name, dag_run_id, logical_date, note, state + ): self._create_dag("TEST_DAG_ID") # We'll patch airflow.utils.timezone.utcnow to always return this so we @@ -1053,6 +1058,8 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, request_json[logical_date_field_name] = logical_date if dag_run_id is not None: request_json["dag_run_id"] = dag_run_id + if state is not None: + request_json["state"] = state request_json["note"] = note with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now): response = self.client.post( @@ -1070,16 +1077,24 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, expected_dag_run_id = f"manual__{expected_logical_date}" else: expected_dag_run_id = dag_run_id + if state is None: + expected_state = "queued" + else: + expected_state = state + if state in ("success", "failed"): + expected_end_date = expected_logical_date + else: + expected_end_date = None assert response.json == { "conf": {}, "dag_id": "TEST_DAG_ID", "dag_run_id": expected_dag_run_id, - "end_date": None, + "end_date": expected_end_date, "execution_date": expected_logical_date, "logical_date": expected_logical_date, "external_trigger": True, "start_date": None, - "state": "queued", + "state": expected_state, "data_interval_end": expected_logical_date, "data_interval_start": expected_logical_date, "last_scheduling_decision": None, @@ -1246,9 +1261,12 @@ def test_response_404(self): ), pytest.param( "api/v1/dags/TEST_DAG_ID/dagRuns", - {"state": "failed", "execution_date": "2020-06-12T18:00:00+00:00"}, { - "detail": "Property is read-only - 'state'", + "last_scheduling_decision": "2020-06-12T18:00:00+00:00", + "execution_date": "2020-06-12T18:00:00+00:00", + }, + { + "detail": "Property is read-only - 'last_scheduling_decision'", "status": 400, "title": "Bad Request", "type": EXCEPTIONS_LINK_MAP[400], From 94cfb572a434dba2d2808c38665d2d62746203a7 Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Tue, 14 Mar 2023 20:56:14 -0700 Subject: [PATCH 2/4] Update airflow/api_connexion/schemas/dag_run_schema.py Co-authored-by: Tzu-ping Chung --- airflow/api_connexion/schemas/dag_run_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index 19b05ced7f835..abd9b5cbc6dcd 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -66,7 +66,7 @@ class Meta: execution_date = auto_field(data_key="logical_date", validate=validate_istimezone) start_date = auto_field(dump_only=True) end_date = auto_field(dump_only=True) - state = DagStateField(dump_only=False) + state = DagStateField() external_trigger = auto_field(dump_default=True, dump_only=True) conf = ConfObject() data_interval_start = auto_field(dump_only=True) From 7ad5e4c76d29d93ec16c51007012168b52119d3c Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Wed, 15 Mar 2023 21:09:00 -0700 Subject: [PATCH 3/4] Update airflow/api_connexion/endpoints/dag_run_endpoint.py Co-authored-by: Tzu-ping Chung --- airflow/api_connexion/endpoints/dag_run_endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 62199f52d35a0..e8ed58210bf01 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -338,7 +338,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: run_id=run_id, execution_date=logical_date, data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date), - state=post_body.get("state"), + state=post_body.get("state", DagRunState.QUEUED), conf=post_body.get("conf"), external_trigger=True, dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id), From f020847dc40739a307ca2c1a704439bae6812fd5 Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Wed, 15 Mar 2023 21:44:25 -0700 Subject: [PATCH 4/4] Adding test for 400 on invalid state --- .../endpoints/test_dag_run_endpoint.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index c643786c5efcc..e84b33d85780b 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -1228,6 +1228,27 @@ def test_should_response_400_for_non_dict_dagrun_conf(self, data, expected): assert response.status_code == 400 assert response.json["detail"] == expected + @pytest.mark.parametrize( + "data, expected", + [ + ( + { + "dag_run_id": "TEST_DAG_RUN", + "execution_date": "2020-06-11T18:00:00+00:00", + "state": "queueueued", + }, + "'queueueued' is not one of ['queued', 'running', 'success', 'failed'] - 'state'", + ) + ], + ) + def test_should_response_400_for_invalid_state(self, data, expected): + self._create_dag("TEST_DAG_ID") + response = self.client.post( + "api/v1/dags/TEST_DAG_ID/dagRuns", json=data, environ_overrides={"REMOTE_USER": "test"} + ) + assert response.status_code == 400 + assert response.json["detail"] == expected + def test_response_404(self): response = self.client.post( "api/v1/dags/TEST_DAG_ID/dagRuns",