diff --git a/airflow-core/src/airflow/api_fastapi/common/headers.py b/airflow-core/src/airflow/api_fastapi/common/headers.py index 7d1a0fa69613b..13567e32bdc7d 100644 --- a/airflow-core/src/airflow/api_fastapi/common/headers.py +++ b/airflow-core/src/airflow/api_fastapi/common/headers.py @@ -47,3 +47,30 @@ def header_accept_json_or_text_depends( HeaderAcceptJsonOrText = Annotated[Mimetype, Depends(header_accept_json_or_text_depends)] + + +def header_accept_json_or_ndjson_depends( + accept: Annotated[ + str, + Header( + json_schema_extra={ + "type": "string", + "enum": [Mimetype.JSON, Mimetype.NDJSON, Mimetype.ANY], + } + ), + ] = Mimetype.ANY, +) -> Mimetype: + if accept.startswith(Mimetype.ANY): + return Mimetype.ANY + if accept.startswith(Mimetype.JSON): + return Mimetype.JSON + if accept.startswith(Mimetype.NDJSON) or accept.startswith(Mimetype.ANY): + return Mimetype.NDJSON + + raise HTTPException( + status_code=status.HTTP_406_NOT_ACCEPTABLE, + detail="Only application/json or application/x-ndjson is supported", + ) + + +HeaderAcceptJsonOrNdjson = Annotated[Mimetype, Depends(header_accept_json_or_ndjson_depends)] diff --git a/airflow-core/src/airflow/api_fastapi/common/types.py b/airflow-core/src/airflow/api_fastapi/common/types.py index 0b431dfdef466..18e5dc7387d62 100644 --- a/airflow-core/src/airflow/api_fastapi/common/types.py +++ b/airflow-core/src/airflow/api_fastapi/common/types.py @@ -72,6 +72,7 @@ class Mimetype(str, Enum): TEXT = "text/plain" JSON = "application/json" + NDJSON = "application/x-ndjson" ANY = "*/*" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml index ff6725b266b55..7478465911042 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml @@ -6354,7 +6354,7 @@ paths: type: string enum: - application/json - - text/plain + - application/x-ndjson - '*/*' default: '*/*' title: Accept @@ -6365,10 +6365,12 @@ paths: application/json: schema: $ref: '#/components/schemas/TaskInstancesLogResponse' - text/plain: + application/x-ndjson: schema: type: string - example: 'content + example: '{"content": "content"} + + {"content": "content"} ' '401': diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py index 3873cadf69d76..05313e2b69b5a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py @@ -28,7 +28,7 @@ from airflow.api_fastapi.common.dagbag import DagBagDep from airflow.api_fastapi.common.db.common import SessionDep -from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrNdjson from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.common.types import Mimetype from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse @@ -43,13 +43,14 @@ tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" ) -text_example_response_for_get_log = { - Mimetype.TEXT: { +ndjson_example_response_for_get_log = { + Mimetype.NDJSON: { "schema": { "type": "string", "example": textwrap.dedent( """\ - content + {"content": "content"} + {"content": "content"} """ ), } @@ -63,7 +64,7 @@ **create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), status.HTTP_200_OK: { "description": "Successful Response", - "content": text_example_response_for_get_log, + "content": ndjson_example_response_for_get_log, }, }, dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.TASK_LOGS))], @@ -75,7 +76,7 @@ def get_log( dag_run_id: str, task_id: str, try_number: PositiveInt, - accept: HeaderAcceptJsonOrText, + accept: HeaderAcceptJsonOrNdjson, request: Request, dag_bag: DagBagDep, session: SessionDep, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 5af1938bea09a..a7efa4868c859 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1267,7 +1267,7 @@ export const UseTaskInstanceServiceGetLogKeyFn = ( token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 1ec386a67f906..f02690c160ba0 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1744,7 +1744,7 @@ export const ensureUseTaskInstanceServiceGetLogData = ( token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index c67b0e525cb18..b9039a10f3719 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1744,7 +1744,7 @@ export const prefetchUseTaskInstanceServiceGetLog = ( token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 620dab69dc6d0..30b49d52aa330 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -2081,7 +2081,7 @@ export const useTaskInstanceServiceGetLog = < token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 767004d466379..d525b0a662c39 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -2058,7 +2058,7 @@ export const useTaskInstanceServiceGetLogSuspense = < token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 18322378fd5f4..b04ce36ef78d4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2386,7 +2386,7 @@ export type PatchTaskInstanceDryRunData = { export type PatchTaskInstanceDryRunResponse = TaskInstanceCollectionResponse; export type GetLogData = { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "application/x-ndjson" | "*/*"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py index a906edc8b2d51..1b10e4c16a96e 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py @@ -213,9 +213,7 @@ def test_should_respond_200_json(self, try_number): ), ], ) - def test_should_respond_200_text_plain( - self, request_url, expected_filename, extra_query_string, try_number - ): + def test_should_respond_200_ndjson(self, request_url, expected_filename, extra_query_string, try_number): expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir)) key = self.app.state.secret_key @@ -225,7 +223,7 @@ def test_should_respond_200_text_plain( response = self.client.get( request_url, params={"token": token, **extra_query_string}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 200 @@ -281,7 +279,7 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu response = self.client.get( request_url, params={"token": token, **extra_query_string}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 200 @@ -316,7 +314,7 @@ def test_get_logs_with_metadata_as_download_large_file(self, try_number): response = self.client.get( f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/" f"taskInstances/{self.TASK_ID}/logs/{try_number}?full_content=True", - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert "1st line" in response.content.decode("utf-8") @@ -384,7 +382,7 @@ def test_should_raise_404_when_missing_map_index_param_for_mapped_task(self): response = self.client.get( f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.MAPPED_TASK_ID}/logs/1", params={"token": token}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 404 assert response.json()["detail"] == "TaskInstance not found" @@ -397,7 +395,7 @@ def test_should_raise_404_when_filtering_on_map_index_for_unmapped_task(self): response = self.client.get( f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", params={"token": token, "map_index": 0}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 404 assert response.json()["detail"] == "TaskInstance not found"