diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 945a5eccb769d..e8ed58210bf01 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -338,7 +338,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: run_id=run_id, execution_date=logical_date, data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date), - state=DagRunState.QUEUED, + state=post_body.get("state", DagRunState.QUEUED), conf=post_body.get("conf"), external_trigger=True, dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id), diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index b8515a00c9292..b7dde8a201afa 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2805,7 +2805,7 @@ components: - dataset_triggered state: $ref: '#/components/schemas/DagState' - readOnly: true + default: queued external_trigger: type: boolean default: true diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index d9cecb9b0b137..abd9b5cbc6dcd 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -66,7 +66,7 @@ class Meta: execution_date = auto_field(data_key="logical_date", validate=validate_istimezone) start_date = auto_field(dump_only=True) end_date = auto_field(dump_only=True) - state = DagStateField(dump_only=True) + state = DagStateField() external_trigger = auto_field(dump_default=True, dump_only=True) conf = ConfObject() data_interval_start = auto_field(dump_only=True) diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 09ecb9e497142..e84b33d85780b 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -1034,14 +1034,19 @@ def test_end_date_gte_lte(self, payload, expected_dag_run_ids): class TestPostDagRun(TestDagRunEndpoint): @pytest.mark.parametrize("logical_date_field_name", ["execution_date", "logical_date"]) @pytest.mark.parametrize( - "dag_run_id, logical_date, note", + "dag_run_id, logical_date, note, state", [ - pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", id="all-present"), - pytest.param(None, "2020-06-11T18:00:00+00:00", None, id="only-date"), - pytest.param(None, None, None, id="all-missing"), + pytest.param( + "TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", "queued", id="all-present" + ), + pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, id="only-date"), + pytest.param(None, None, None, None, id="all-missing"), + pytest.param(None, None, None, "failed", id="create-failed"), ], ) - def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, logical_date, note): + def test_should_respond_200( + self, session, logical_date_field_name, dag_run_id, logical_date, note, state + ): self._create_dag("TEST_DAG_ID") # We'll patch airflow.utils.timezone.utcnow to always return this so we @@ -1053,6 +1058,8 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, request_json[logical_date_field_name] = logical_date if dag_run_id is not None: request_json["dag_run_id"] = dag_run_id + if state is not None: + request_json["state"] = state request_json["note"] = note with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now): response = self.client.post( @@ -1070,16 +1077,24 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, expected_dag_run_id = f"manual__{expected_logical_date}" else: expected_dag_run_id = dag_run_id + if state is None: + expected_state = "queued" + else: + expected_state = state + if state in ("success", "failed"): + expected_end_date = expected_logical_date + else: + expected_end_date = None assert response.json == { "conf": {}, "dag_id": "TEST_DAG_ID", "dag_run_id": expected_dag_run_id, - "end_date": None, + "end_date": expected_end_date, "execution_date": expected_logical_date, "logical_date": expected_logical_date, "external_trigger": True, "start_date": None, - "state": "queued", + "state": expected_state, "data_interval_end": expected_logical_date, "data_interval_start": expected_logical_date, "last_scheduling_decision": None, @@ -1213,6 +1228,27 @@ def test_should_response_400_for_non_dict_dagrun_conf(self, data, expected): assert response.status_code == 400 assert response.json["detail"] == expected + @pytest.mark.parametrize( + "data, expected", + [ + ( + { + "dag_run_id": "TEST_DAG_RUN", + "execution_date": "2020-06-11T18:00:00+00:00", + "state": "queueueued", + }, + "'queueueued' is not one of ['queued', 'running', 'success', 'failed'] - 'state'", + ) + ], + ) + def test_should_response_400_for_invalid_state(self, data, expected): + self._create_dag("TEST_DAG_ID") + response = self.client.post( + "api/v1/dags/TEST_DAG_ID/dagRuns", json=data, environ_overrides={"REMOTE_USER": "test"} + ) + assert response.status_code == 400 + assert response.json["detail"] == expected + def test_response_404(self): response = self.client.post( "api/v1/dags/TEST_DAG_ID/dagRuns", @@ -1246,9 +1282,12 @@ def test_response_404(self): ), pytest.param( "api/v1/dags/TEST_DAG_ID/dagRuns", - {"state": "failed", "execution_date": "2020-06-12T18:00:00+00:00"}, { - "detail": "Property is read-only - 'state'", + "last_scheduling_decision": "2020-06-12T18:00:00+00:00", + "execution_date": "2020-06-12T18:00:00+00:00", + }, + { + "detail": "Property is read-only - 'last_scheduling_decision'", "status": 400, "title": "Bad Request", "type": EXCEPTIONS_LINK_MAP[400],