Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
run_id=run_id,
execution_date=logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date),
state=DagRunState.QUEUED,
state=post_body.get("state", DagRunState.QUEUED),
conf=post_body.get("conf"),
external_trigger=True,
dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id),
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2805,7 +2805,7 @@ components:
- dataset_triggered
state:
$ref: '#/components/schemas/DagState'
readOnly: true
default: queued
external_trigger:
type: boolean
default: true
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Meta:
execution_date = auto_field(data_key="logical_date", validate=validate_istimezone)
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = DagStateField(dump_only=True)
state = DagStateField()
external_trigger = auto_field(dump_default=True, dump_only=True)
conf = ConfObject()
data_interval_start = auto_field(dump_only=True)
Expand Down
57 changes: 48 additions & 9 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,14 +1034,19 @@ def test_end_date_gte_lte(self, payload, expected_dag_run_ids):
class TestPostDagRun(TestDagRunEndpoint):
@pytest.mark.parametrize("logical_date_field_name", ["execution_date", "logical_date"])
@pytest.mark.parametrize(
"dag_run_id, logical_date, note",
"dag_run_id, logical_date, note, state",
[
pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", id="all-present"),
pytest.param(None, "2020-06-11T18:00:00+00:00", None, id="only-date"),
pytest.param(None, None, None, id="all-missing"),
pytest.param(
"TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", "queued", id="all-present"
),
pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, id="only-date"),
pytest.param(None, None, None, None, id="all-missing"),
pytest.param(None, None, None, "failed", id="create-failed"),
],
)
def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, logical_date, note):
def test_should_respond_200(
self, session, logical_date_field_name, dag_run_id, logical_date, note, state
):
self._create_dag("TEST_DAG_ID")

# We'll patch airflow.utils.timezone.utcnow to always return this so we
Expand All @@ -1053,6 +1058,8 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id,
request_json[logical_date_field_name] = logical_date
if dag_run_id is not None:
request_json["dag_run_id"] = dag_run_id
if state is not None:
request_json["state"] = state
request_json["note"] = note
with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now):
response = self.client.post(
Expand All @@ -1070,16 +1077,24 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id,
expected_dag_run_id = f"manual__{expected_logical_date}"
else:
expected_dag_run_id = dag_run_id
if state is None:
expected_state = "queued"
else:
expected_state = state
if state in ("success", "failed"):
expected_end_date = expected_logical_date
else:
expected_end_date = None
assert response.json == {
"conf": {},
"dag_id": "TEST_DAG_ID",
"dag_run_id": expected_dag_run_id,
"end_date": None,
"end_date": expected_end_date,
"execution_date": expected_logical_date,
"logical_date": expected_logical_date,
"external_trigger": True,
"start_date": None,
"state": "queued",
"state": expected_state,
"data_interval_end": expected_logical_date,
"data_interval_start": expected_logical_date,
"last_scheduling_decision": None,
Expand Down Expand Up @@ -1213,6 +1228,27 @@ def test_should_response_400_for_non_dict_dagrun_conf(self, data, expected):
assert response.status_code == 400
assert response.json["detail"] == expected

@pytest.mark.parametrize(
"data, expected",
[
(
{
"dag_run_id": "TEST_DAG_RUN",
"execution_date": "2020-06-11T18:00:00+00:00",
"state": "queueueued",
},
"'queueueued' is not one of ['queued', 'running', 'success', 'failed'] - 'state'",
)
],
)
def test_should_response_400_for_invalid_state(self, data, expected):
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns", json=data, environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 400
assert response.json["detail"] == expected

def test_response_404(self):
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
Expand Down Expand Up @@ -1246,9 +1282,12 @@ def test_response_404(self):
),
pytest.param(
"api/v1/dags/TEST_DAG_ID/dagRuns",
{"state": "failed", "execution_date": "2020-06-12T18:00:00+00:00"},
{
"detail": "Property is read-only - 'state'",
"last_scheduling_decision": "2020-06-12T18:00:00+00:00",
"execution_date": "2020-06-12T18:00:00+00:00",
},
{
"detail": "Property is read-only - 'last_scheduling_decision'",
"status": 400,
"title": "Bad Request",
"type": EXCEPTIONS_LINK_MAP[400],
Expand Down