Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions airflow/providers/edge/api_endpoints/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
22 changes: 22 additions & 0 deletions airflow/providers/edge/api_endpoints/health_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations


def health():
return {}
150 changes: 150 additions & 0 deletions airflow/providers/edge/api_endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import functools
import json
import logging
from typing import TYPE_CHECKING, Any, Callable

from flask import Response, request
from itsdangerous import BadSignature
from jwt import (
ExpiredSignatureError,
ImmatureSignatureError,
InvalidAudienceError,
InvalidIssuedAtError,
InvalidSignatureError,
)

from airflow.api_connexion.exceptions import PermissionDenied
from airflow.api_internal.endpoints.rpc_api_endpoint import (
initialize_method_map,
log_and_build_error_response,
)
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.utils.jwt_signer import JWTSigner
from airflow.utils.session import create_session

if TYPE_CHECKING:
from airflow.api_connexion.types import APIResponse

log = logging.getLogger(__name__)


@functools.lru_cache
def _initialize_method_map() -> dict[str, Callable]:
from airflow.providers.edge.models.edge_job import EdgeJob
from airflow.providers.edge.models.edge_logs import EdgeLogs
from airflow.providers.edge.models.edge_worker import EdgeWorker

internal_api_functions = initialize_method_map().values()
functions: list[Callable] = [
*internal_api_functions,
# Additional things from EdgeExecutor
EdgeJob.reserve_task,
EdgeJob.set_state,
EdgeLogs.push_logs,
EdgeWorker.register_worker,
EdgeWorker.set_state,
]
return {f"{func.__module__}.{func.__qualname__}": func for func in functions}


def edge_worker_api(body: dict[str, Any]) -> APIResponse:
"""Handle Edge Worker API `/edge_worker/v1/rpcapi` endpoint."""
# Note: Except the method map this is a 100% copy of internal API module
# airflow.api_internal.endpoints.rpc_api_endpoint.internal_airflow_api()
content_type = request.headers.get("Content-Type")
if content_type != "application/json":
raise PermissionDenied("Expected Content-Type: application/json")
accept = request.headers.get("Accept")
if accept != "application/json":
raise PermissionDenied("Expected Accept: application/json")
auth = request.headers.get("Authorization", "")
clock_grace = conf.getint("core", "internal_api_clock_grace", fallback=30)
signer = JWTSigner(
secret_key=conf.get("core", "internal_api_secret_key"),
expiration_time_in_seconds=clock_grace,
leeway_in_seconds=clock_grace,
audience="api",
)
try:
payload = signer.verify_token(auth)
signed_method = payload.get("method")
if not signed_method or signed_method != body.get("method"):
raise BadSignature("Invalid method in token authorization.")
except BadSignature:
raise PermissionDenied("Bad Signature. Please use only the tokens provided by the API.")
except InvalidAudienceError:
raise PermissionDenied("Invalid audience for the request")
except InvalidSignatureError:
raise PermissionDenied("The signature of the request was wrong")
except ImmatureSignatureError:
raise PermissionDenied("The signature of the request was sent from the future")
except ExpiredSignatureError:
raise PermissionDenied(
"The signature of the request has expired. Make sure that all components "
"in your system have synchronized clocks.",
)
except InvalidIssuedAtError:
raise PermissionDenied(
"The request was issues in the future. Make sure that all components "
"in your system have synchronized clocks.",
)
except Exception:
raise PermissionDenied("Unable to authenticate API via token.")

log.debug("Got request")
json_rpc = body.get("jsonrpc")
if json_rpc != "2.0":
return log_and_build_error_response(message="Expected jsonrpc 2.0 request.", status=400)

methods_map = _initialize_method_map()
method_name = body.get("method")
if method_name not in methods_map:
return log_and_build_error_response(message=f"Unrecognized method: {method_name}.", status=400)

handler = methods_map[method_name]
params = {}
try:
if body.get("params"):
params_json = body.get("params")
params = BaseSerialization.deserialize(params_json, use_pydantic_models=True)
except Exception:
return log_and_build_error_response(message="Error deserializing parameters.", status=400)

log.debug("Calling method %s\nparams: %s", method_name, params)
try:
# Session must be created there as it may be needed by serializer for lazy-loaded fields.
with create_session() as session:
output = handler(**params, session=session)
output_json = BaseSerialization.serialize(output, use_pydantic_models=True)
response = json.dumps(output_json) if output_json is not None else None
log.debug("Sending response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
# In case of AirflowException or other selective known types, transport the exception class back to caller
except (KeyError, AttributeError, AirflowException) as e:
exception_json = BaseSerialization.serialize(e, use_pydantic_models=True)
response = json.dumps(exception_json)
log.debug("Sending exception response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except Exception:
return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500)
16 changes: 16 additions & 0 deletions airflow/providers/edge/openapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
102 changes: 102 additions & 0 deletions airflow/providers/edge/openapi/edge_worker_api_v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
openapi: 3.0.2
info:
title: Airflow Edge Worker API
version: 1.0.0
description: |
This is Airflow Edge Worker API - which is a the access endpoint for workers
running on remote sites serving for Apache Airflow jobs. It also proxies internal API
to edge endpoints.

It is not intended to be used by any external code.

You can find more information in AIP-69
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=301795932


servers:
- url: /edge_worker/v1
description: Airflow Edge Worker API
paths:
"/rpcapi":
post:
deprecated: false
x-openapi-router-controller: airflow.providers.edge.api_endpoints.rpc_api_endpoint
operationId: edge_worker_api
tags:
- JSONRPC
parameters: []
responses:
'200':
description: Successful response
requestBody:
x-body-name: body
required: true
content:
application/json:
schema:
type: object
required:
- method
- jsonrpc
- params
properties:
jsonrpc:
type: string
default: '2.0'
description: JSON-RPC Version (2.0)
method:
type: string
description: Method name
params:
title: Parameters
type: object
"/health":
get:
operationId: health
deprecated: false
x-openapi-router-controller: airflow.providers.edge.api_endpoints.health_endpoint
tags:
- JSONRPC
parameters: []
responses:
'200':
description: Successful response
x-headers: []
x-explorer-enabled: true
x-proxy-enabled: true
components:
schemas:
JsonRpcRequired:
type: object
required:
- method
- jsonrpc
properties:
method:
type: string
description: Method name
jsonrpc:
type: string
default: '2.0'
description: JSON-RPC Version (2.0)
discriminator:
propertyName: method_name
tags: []
16 changes: 16 additions & 0 deletions airflow/providers/edge/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading