From 0dcb3ab55d37aca3cded5e67d1765ecc4c05c9fb Mon Sep 17 00:00:00 2001 From: Szymon Przedwojski Date: Mon, 17 Sep 2018 10:35:08 +0200 Subject: [PATCH] [AIRFLOW-2912] Add Deploy and Delete operators for GCF Both Deploy and Delete operators interact with Google Cloud Functions to manage functions. Both are idempotent and make use of GcfHook - hook that encapsulates communication with GCP over GCP API. Invoke operator is also in the works, but it waits for alpha IAM functionality that will soon be available and helps with protecting invoke operation. Part of this commit message is also Body Field Validator that is meant to be reusable (but for now is an integral part of GCF operators). It will be separated out into utils in one of the subsequent Pull Requests where it will be reused in other operators (GCE ones) Co-authored-by: potiuk --- .../example_gcp_function_delete.py | 58 ++ .../example_gcp_function_deploy_delete.py | 113 ++++ airflow/contrib/hooks/gcp_function_hook.py | 195 ++++++ .../operators/gcp_function_operator.py | 563 ++++++++++++++++ docs/howto/operator.rst | 120 ++++ docs/integration.rst | 34 + .../operators/test_gcp_function_operator.py | 624 ++++++++++++++++++ 7 files changed, 1707 insertions(+) create mode 100644 airflow/contrib/example_dags/example_gcp_function_delete.py create mode 100644 airflow/contrib/example_dags/example_gcp_function_deploy_delete.py create mode 100644 airflow/contrib/hooks/gcp_function_hook.py create mode 100644 airflow/contrib/operators/gcp_function_operator.py create mode 100644 tests/contrib/operators/test_gcp_function_operator.py diff --git a/airflow/contrib/example_dags/example_gcp_function_delete.py b/airflow/contrib/example_dags/example_gcp_function_delete.py new file mode 100644 index 0000000000000..30f5369af60ef --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_function_delete.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that deletes a Google Cloud Function. +This DAG relies on the following Airflow variables +https://airflow.apache.org/concepts.html#variables +* PROJECT_ID - Google Cloud Project where the Cloud Function exists. +* LOCATION - Google Cloud Functions region where the function exists. +* ENTRYPOINT - Name of the executable function in the source code. +""" + +import datetime + +import airflow +from airflow import models +from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator + +# [START howto_operator_gcf_delete_args] +PROJECT_ID = models.Variable.get('PROJECT_ID', '') +LOCATION = models.Variable.get('LOCATION', '') +ENTRYPOINT = models.Variable.get('ENTRYPOINT', '') +# A fully-qualified name of the function to delete + +FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION, + ENTRYPOINT) +default_args = { + 'start_date': airflow.utils.dates.days_ago(1) +} +# [END howto_operator_gcf_delete_args] + +with models.DAG( + 'example_gcp_function_delete', + default_args=default_args, + schedule_interval=datetime.timedelta(days=1) +) as dag: + # [START howto_operator_gcf_delete] + t1 = GcfFunctionDeleteOperator( + task_id="gcf_delete_task", + name=FUNCTION_NAME + ) + # [END howto_operator_gcf_delete] diff --git a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py new file mode 100644 index 0000000000000..a0e44957b9204 --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that creates a Google Cloud Function and then deletes it. + +This DAG relies on the following Airflow variables +https://airflow.apache.org/concepts.html#variables +* PROJECT_ID - Google Cloud Project to use for the Cloud Function. +* LOCATION - Google Cloud Functions region where the function should be + created. +* SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage +or + * SOURCE_UPLOAD_URL - Generated upload URL for the zipped source + or + * ZIP_PATH - Local path to the zipped source archive +or +* SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function is +defined in a supported Cloud Source Repository URL format +https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository +* ENTRYPOINT - Name of the executable function in the source code. +""" + +import datetime + +from airflow import models +from airflow.contrib.operators.gcp_function_operator \ + import GcfFunctionDeployOperator, GcfFunctionDeleteOperator +from airflow.utils import dates + +# [START howto_operator_gcf_deploy_variables] +PROJECT_ID = models.Variable.get('PROJECT_ID', '') +LOCATION = models.Variable.get('LOCATION', '') +SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '') +SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '') +SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '') +ZIP_PATH = models.Variable.get('ZIP_PATH', '') +ENTRYPOINT = models.Variable.get('ENTRYPOINT', '') +FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION, + ENTRYPOINT) +RUNTIME = 'nodejs6' +VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True) + +# [END howto_operator_gcf_deploy_variables] + +# [START howto_operator_gcf_deploy_body] +body = { + "name": FUNCTION_NAME, + "entryPoint": ENTRYPOINT, + "runtime": RUNTIME, + "httpsTrigger": {} +} +# [END howto_operator_gcf_deploy_body] + +# [START howto_operator_gcf_deploy_args] +default_args = { + 'start_date': dates.days_ago(1), + 'project_id': PROJECT_ID, + 'location': LOCATION, + 'body': body, + 'validate_body': VALIDATE_BODY +} +# [END howto_operator_gcf_deploy_args] + +# [START howto_operator_gcf_deploy_variants] +if SOURCE_ARCHIVE_URL: + body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL +elif SOURCE_REPOSITORY: + body['sourceRepository'] = { + 'url': SOURCE_REPOSITORY + } +elif ZIP_PATH: + body['sourceUploadUrl'] = '' + default_args['zip_path'] = ZIP_PATH +elif SOURCE_UPLOAD_URL: + body['sourceUploadUrl'] = SOURCE_UPLOAD_URL +else: + raise Exception("Please provide one of the source_code parameters") +# [END howto_operator_gcf_deploy_variants] + + +with models.DAG( + 'example_gcp_function_deploy_delete', + default_args=default_args, + schedule_interval=datetime.timedelta(days=1) +) as dag: + # [START howto_operator_gcf_deploy] + deploy_task = GcfFunctionDeployOperator( + task_id="gcf_deploy_task", + name=FUNCTION_NAME + ) + # [END howto_operator_gcf_deploy] + delete_task = GcfFunctionDeleteOperator( + task_id="gcf_delete_task", + name=FUNCTION_NAME + ) + deploy_task >> delete_task diff --git a/airflow/contrib/hooks/gcp_function_hook.py b/airflow/contrib/hooks/gcp_function_hook.py new file mode 100644 index 0000000000000..d2da19285ab13 --- /dev/null +++ b/airflow/contrib/hooks/gcp_function_hook.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import time +import requests +from googleapiclient.discovery import build + +from airflow import AirflowException +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + +# Number of retries - used by googleapiclient method calls to perform retries +# For requests that are "retriable" +NUM_RETRIES = 5 + +# Time to sleep between active checks of the operation results +TIME_TO_SLEEP_IN_SECONDS = 1 + + +# noinspection PyAbstractClass +class GcfHook(GoogleCloudBaseHook): + """ + Hook for Google Cloud Functions APIs. + """ + _conn = None + + def __init__(self, + api_version, + gcp_conn_id='google_cloud_default', + delegate_to=None): + super(GcfHook, self).__init__(gcp_conn_id, delegate_to) + self.api_version = api_version + + def get_conn(self): + """ + Retrieves connection to cloud functions. + + :return: Google Cloud Function services object + :rtype: dict + """ + if not self._conn: + http_authorized = self._authorize() + self._conn = build('cloudfunctions', self.api_version, + http=http_authorized, cache_discovery=False) + return self._conn + + def get_function(self, name): + """ + Returns the function with a given name. + + :param name: name of the function + :type name: str + :return: a CloudFunction object representing the function + :rtype: dict + """ + return self.get_conn().projects().locations().functions().get( + name=name).execute(num_retries=NUM_RETRIES) + + def list_functions(self, full_location): + """ + Lists all functions created in the location. + + :param full_location: full location including project. On the form + of /projects//location/ + :type full_location: str + :return: array of CloudFunction objects - representing functions in the location + :rtype: [dict] + """ + list_response = self.get_conn().projects().locations().functions().list( + parent=full_location).execute(num_retries=NUM_RETRIES) + return list_response.get("functions", []) + + def create_new_function(self, full_location, body): + """ + Creates new cloud function in location given with body specified. + + :param full_location: full location including project. On the form + of /projects//location/ + :type full_location: str + :param body: body required by the cloud function insert API + :type body: dict + :return: response returned by the operation + :rtype: dict + """ + response = self.get_conn().projects().locations().functions().create( + location=full_location, + body=body + ).execute(num_retries=NUM_RETRIES) + operation_name = response["name"] + return self._wait_for_operation_to_complete(operation_name) + + def update_function(self, name, body, update_mask): + """ + Updates cloud function according to the update mask specified. + + :param name: name of the function + :type name: str + :param body: body required by the cloud function patch API + :type body: str + :param update_mask: update mask - array of fields that should be patched + :type update_mask: [str] + :return: response returned by the operation + :rtype: dict + """ + response = self.get_conn().projects().locations().functions().patch( + updateMask=",".join(update_mask), + name=name, + body=body + ).execute(num_retries=NUM_RETRIES) + operation_name = response["name"] + return self._wait_for_operation_to_complete(operation_name) + + def upload_function_zip(self, parent, zip_path): + """ + Uploads zip file with sources. + + :param parent: project and location in which signed upload URL should be generated + in the form of /projects//location/ + :type parent: str + :param zip_path: path of the file to upload (should point to valid .zip file) + :type zip_path: str + :return: Upload URL that was returned by generateUploadUrl method + """ + response = self.get_conn().projects().locations().functions().generateUploadUrl( + parent=parent + ).execute(num_retries=NUM_RETRIES) + upload_url = response.get('uploadUrl') + with open(zip_path, 'rb') as fp: + requests.put( + url=upload_url, + data=fp.read(), + # Those two headers needs to be specified according to: + # https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/generateUploadUrl + # nopep8 + headers={ + 'Content-type': 'application/zip', + 'x-goog-content-length-range': '0,104857600', + } + ) + return upload_url + + def delete_function(self, name): + """ + Deletes cloud function specified by name. + + :param name: name of the function + :type name: str + :return: response returned by the operation + :rtype: dict + """ + response = self.get_conn().projects().locations().functions().delete( + name=name).execute(num_retries=NUM_RETRIES) + operation_name = response["name"] + return self._wait_for_operation_to_complete(operation_name) + + def _wait_for_operation_to_complete(self, operation_name): + """ + Waits for the named operation to complete - checks status of the + asynchronous call. + + :param operation_name: name of the operation + :type operation_name: str + :return: response returned by the operation + :rtype: dict + :exception: AirflowException in case error is returned + """ + service = self.get_conn() + while True: + operation_response = service.operations().get( + name=operation_name, + ).execute(num_retries=NUM_RETRIES) + if operation_response.get("done"): + response = operation_response.get("response") + error = operation_response.get("error") + # Note, according to documentation always either response or error is + # set when "done" == True + if error: + raise AirflowException(str(error)) + return response + time.sleep(TIME_TO_SLEEP_IN_SECONDS) diff --git a/airflow/contrib/operators/gcp_function_operator.py b/airflow/contrib/operators/gcp_function_operator.py new file mode 100644 index 0000000000000..4455307c93259 --- /dev/null +++ b/airflow/contrib/operators/gcp_function_operator.py @@ -0,0 +1,563 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import re + +from googleapiclient.errors import HttpError + +from airflow import AirflowException, LoggingMixin +from airflow.version import version +from airflow.models import BaseOperator +from airflow.contrib.hooks.gcp_function_hook import GcfHook +from airflow.utils.decorators import apply_defaults + +# TODO: This whole section should be extracted later to contrib/tools/field_validator.py + +COMPOSITE_FIELD_TYPES = ['union', 'dict'] + + +class FieldValidationException(AirflowException): + """ + Thrown when validation finds dictionary field not valid according to specification. + """ + + def __init__(self, message): + super(FieldValidationException, self).__init__(message) + + +class ValidationSpecificationException(AirflowException): + """ + Thrown when validation specification is wrong + (rather than dictionary being validated). + This should only happen during development as ideally + specification itself should not be invalid ;) . + """ + + def __init__(self, message): + super(ValidationSpecificationException, self).__init__(message) + + +# TODO: make better description, add some examples +# TODO: move to contrib/utils folder when we reuse it. +class BodyFieldValidator(LoggingMixin): + """ + Validates correctness of request body according to specification. + The specification can describe various type of + fields including custom validation, and union of fields. This validator is meant + to be reusable by various operators + in the near future, but for now it is left as part of the Google Cloud Function, + so documentation about the + validator is not yet complete. To see what kind of specification can be used, + please take a look at + gcp_function_operator.CLOUD_FUNCTION_VALIDATION which specifies validation + for GCF deploy operator. + + :param validation_specs: dictionary describing validation specification + :type validation_specs: [dict] + :param api_version: Version of the api used (for example v1) + :type api_version: str + + """ + def __init__(self, validation_specs, api_version): + # type: ([dict], str) -> None + super(BodyFieldValidator, self).__init__() + self._validation_specs = validation_specs + self._api_version = api_version + + @staticmethod + def _get_field_name_with_parent(field_name, parent): + if parent: + return parent + '.' + field_name + return field_name + + @staticmethod + def _sanity_checks(children_validation_specs, field_type, full_field_path, + regexp, custom_validation, value): + # type: (dict, str, str, str, function, object) -> None + if value is None and field_type != 'union': + raise FieldValidationException( + "The required body field '{}' is missing. Please add it.". + format(full_field_path)) + if regexp and field_type: + raise ValidationSpecificationException( + "The validation specification entry '{}' has both type and regexp. " + "The regexp is only allowed without type (i.e. assume type is 'str' " + "that can be validated with regexp)".format(full_field_path)) + if children_validation_specs and field_type not in COMPOSITE_FIELD_TYPES: + raise ValidationSpecificationException( + "Nested fields are specified in field '{}' of type '{}'. " + "Nested fields are only allowed for fields of those types: ('{}').". + format(full_field_path, field_type, COMPOSITE_FIELD_TYPES)) + if custom_validation and field_type: + raise ValidationSpecificationException( + "The validation specification field '{}' has both type and " + "custom_validation. Custom validation is only allowed without type.". + format(full_field_path)) + + @staticmethod + def _validate_regexp(full_field_path, regexp, value): + # type: (str, str, str) -> None + if not re.match(regexp, value): + # Note matching of only the beginning as we assume the regexps all-or-nothing + raise FieldValidationException( + "The body field '{}' of value '{}' does not match the field " + "specification regexp: '{}'.". + format(full_field_path, value, regexp)) + + def _validate_dict(self, children_validation_specs, full_field_path, value): + # type: (dict, str, dict) -> None + for child_validation_spec in children_validation_specs: + self._validate_field(validation_spec=child_validation_spec, + dictionary_to_validate=value, + parent=full_field_path) + for field_name in value.keys(): + if field_name not in [spec['name'] for spec in children_validation_specs]: + self.log.warning( + "The field '{}' is in the body, but is not specified in the " + "validation specification '{}'. " + "This might be because you are using newer API version and " + "new field names defined for that version. Then the warning " + "can be safely ignored, or you might want to upgrade the operator" + "to the version that supports the new API version.".format( + self._get_field_name_with_parent(field_name, full_field_path), + children_validation_specs)) + + def _validate_union(self, children_validation_specs, full_field_path, + dictionary_to_validate): + # type: (dict, str, dict) -> None + field_found = False + found_field_name = None + for child_validation_spec in children_validation_specs: + # Forcing optional so that we do not have to type optional = True + # in specification for all union fields + new_field_found = self._validate_field( + validation_spec=child_validation_spec, + dictionary_to_validate=dictionary_to_validate, + parent=full_field_path, + force_optional=True) + field_name = child_validation_spec['name'] + if new_field_found and field_found: + raise FieldValidationException( + "The mutually exclusive fields '{}' and '{}' belonging to the " + "union '{}' are both present. Please remove one". + format(field_name, found_field_name, full_field_path)) + if new_field_found: + field_found = True + found_field_name = field_name + if not field_found: + self.log.warning( + "There is no '{}' union defined in the body {}. " + "Validation expected one of '{}' but could not find any. It's possible " + "that you are using newer API version and there is another union variant " + "defined for that version. Then the warning can be safely ignored, " + "or you might want to upgrade the operator to the version that " + "supports the new API version.".format( + full_field_path, + dictionary_to_validate, + [field['name'] for field in children_validation_specs])) + + def _validate_field(self, validation_spec, dictionary_to_validate, parent=None, + force_optional=False): + """ + Validates if field is OK. + :param validation_spec: specification of the field + :type validation_spec: dict + :param dictionary_to_validate: dictionary where the field should be present + :type dictionary_to_validate: dict + :param parent: full path of parent field + :type parent: str + :param force_optional: forces the field to be optional + (all union fields have force_optional set to True) + :type force_optional: bool + :return: True if the field is present + """ + field_name = validation_spec['name'] + field_type = validation_spec.get('type') + optional = validation_spec.get('optional') + regexp = validation_spec.get('regexp') + children_validation_specs = validation_spec.get('fields') + required_api_version = validation_spec.get('api_version') + custom_validation = validation_spec.get('custom_validation') + + full_field_path = self._get_field_name_with_parent(field_name=field_name, + parent=parent) + if required_api_version and required_api_version != self._api_version: + self.log.debug( + "Skipping validation of the field '{}' for API version '{}' " + "as it is only valid for API version '{}'". + format(field_name, self._api_version, required_api_version)) + return False + value = dictionary_to_validate.get(field_name) + + if (optional or force_optional) and value is None: + self.log.debug("The optional field '{}' is missing. That's perfectly OK.". + format(full_field_path)) + return False + + # Certainly down from here the field is present (value is not None) + # so we should only return True from now on + + self._sanity_checks(children_validation_specs=children_validation_specs, + field_type=field_type, + full_field_path=full_field_path, + regexp=regexp, + custom_validation=custom_validation, + value=value) + + if regexp: + self._validate_regexp(full_field_path, regexp, value) + elif field_type == 'dict': + if not isinstance(value, dict): + raise FieldValidationException( + "The field '{}' should be dictionary type according to " + "specification '{}' but it is '{}'". + format(full_field_path, validation_spec, value)) + if children_validation_specs is None: + self.log.debug( + "The dict field '{}' has no nested fields defined in the " + "specification '{}'. That's perfectly ok - it's content will " + "not be validated." + .format(full_field_path, validation_spec)) + else: + self._validate_dict(children_validation_specs, full_field_path, value) + elif field_type == 'union': + if not children_validation_specs: + raise ValidationSpecificationException( + "The union field '{}' has no nested fields " + "defined in specification '{}'. Unions should have at least one " + "nested field defined.".format(full_field_path, validation_spec)) + self._validate_union(children_validation_specs, full_field_path, + dictionary_to_validate) + elif custom_validation: + try: + custom_validation(value) + except Exception as e: + raise FieldValidationException( + "Error while validating custom field '{}' specified by '{}': '{}'". + format(full_field_path, validation_spec, e)) + elif field_type is None: + self.log.debug("The type of field '{}' is not specified in '{}'. " + "Not validating its content.". + format(full_field_path, validation_spec)) + else: + raise ValidationSpecificationException( + "The field '{}' is of type '{}' in specification '{}'." + "This type is unknown to validation!".format( + full_field_path, field_type, validation_spec)) + return True + + def validate(self, body_to_validate): + """ + Validates if the body (dictionary) follows specification that the validator was + instantiated with. Raises ValidationSpecificationException or + ValidationFieldException in case of problems with specification or the + body not conforming to the specification respectively. + :param body_to_validate: body that must follow the specification + :type body_to_validate: dict + :return: None + """ + try: + for validation_spec in self._validation_specs: + self._validate_field(validation_spec=validation_spec, + dictionary_to_validate=body_to_validate) + except FieldValidationException as e: + raise FieldValidationException( + "There was an error when validating: field '{}': '{}'". + format(body_to_validate, e)) + +# TODO End of field validator to be extracted + + +def _validate_available_memory_in_mb(value): + if int(value) <= 0: + raise FieldValidationException("The available memory has to be greater than 0") + + +def _validate_max_instances(value): + if int(value) <= 0: + raise FieldValidationException( + "The max instances parameter has to be greater than 0") + + +CLOUD_FUNCTION_VALIDATION = [ + dict(name="name", regexp="^.+$"), + dict(name="description", regexp="^.+$", optional=True), + dict(name="entryPoint", regexp=r'^.+$', optional=True), + dict(name="runtime", regexp=r'^.+$', optional=True), + dict(name="timeout", regexp=r'^.+$', optional=True), + dict(name="availableMemoryMb", custom_validation=_validate_available_memory_in_mb, + optional=True), + dict(name="labels", optional=True), + dict(name="environmentVariables", optional=True), + dict(name="network", regexp=r'^.+$', optional=True), + dict(name="maxInstances", optional=True, custom_validation=_validate_max_instances), + + dict(name="source_code", type="union", fields=[ + dict(name="sourceArchiveUrl", regexp=r'^.+$'), + dict(name="sourceRepositoryUrl", regexp=r'^.+$', api_version='v1beta2'), + dict(name="sourceRepository", type="dict", fields=[ + dict(name="url", regexp=r'^.+$') + ]), + dict(name="sourceUploadUrl") + ]), + + dict(name="trigger", type="union", fields=[ + dict(name="httpsTrigger", type="dict", fields=[ + # This dict should be empty at input (url is added at output) + ]), + dict(name="eventTrigger", type="dict", fields=[ + dict(name="eventType", regexp=r'^.+$'), + dict(name="resource", regexp=r'^.+$'), + dict(name="service", regexp=r'^.+$', optional=True), + dict(name="failurePolicy", type="dict", optional=True, fields=[ + dict(name="retry", type="dict", optional=True) + ]) + ]) + ]), +] + + +class GcfFunctionDeployOperator(BaseOperator): + """ + Create a function in Google Cloud Functions. + + :param project_id: Project ID that the operator works on + :type project_id: str + :param location: Region where the operator operates on + :type location: str + :param body: Body of the cloud function definition. The body must be a CloudFunction + dictionary as described in: + https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions + (note that different API versions require different + variants of the CloudFunction dictionary) + :type body: dict or google.cloud.functions.v1.CloudFunction + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: str + :param api_version: Version of the API used (for example v1). + :type api_version: str + :param zip_path: Path to zip file containing source code of the function. If it is + set, then sourceUploadUrl should not be specified in the body (or it should + be empty), then the zip file will be uploaded using upload URL generated + via generateUploadUrl from cloud functions API + :type zip_path: str + :param validate_body: If set to False, no body validation is performed. + :type validate_body: bool + """ + + @apply_defaults + def __init__(self, + project_id, + location, + body, + gcp_conn_id='google_cloud_default', + api_version='v1', + zip_path=None, + validate_body=True, + *args, **kwargs): + self.project_id = project_id + self.location = location + self.full_location = 'projects/{}/locations/{}'.format(self.project_id, + self.location) + self.body = body + self.gcp_conn_id = gcp_conn_id + self.api_version = api_version + self.zip_path = zip_path + self.zip_path_preprocessor = ZipPathPreprocessor(body, zip_path) + self.validate_body = validate_body + self._field_validator = BodyFieldValidator(CLOUD_FUNCTION_VALIDATION, + api_version=api_version) + self._hook = GcfHook(gcp_conn_id=self.gcp_conn_id, api_version=self.api_version) + self._validate_inputs() + super(GcfFunctionDeployOperator, self).__init__(*args, **kwargs) + + def _validate_inputs(self): + if not self.project_id: + raise AirflowException("The required parameter 'project_id' is missing") + if not self.location: + raise AirflowException("The required parameter 'location' is missing") + if not self.body: + raise AirflowException("The required parameter 'body' is missing") + self.zip_path_preprocessor.preprocess_body() + + def _validate_all_body_fields(self): + self._field_validator.validate(self.body) + + def _create_new_function(self): + self._hook.create_new_function(self.full_location, self.body) + + def _update_function(self): + self._hook.update_function(self.body['name'], self.body, self.body.keys()) + + def _check_if_function_exists(self): + name = self.body.get('name') + if not name: + raise FieldValidationException("The 'name' field should be present in " + "body: '{}'.".format(self.body)) + try: + self._hook.get_function(name) + except HttpError as e: + status = e.resp.status + if status == 404: + return False + raise e + return True + + def _upload_source_code(self): + return self._hook.upload_function_zip(parent=self.full_location, + zip_path=self.zip_path) + + def _set_airflow_version_label(self): + if 'labels' not in self.body.keys(): + self.body['labels'] = {} + self.body['labels'].update( + {'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')}) + + def execute(self, context): + if self.zip_path_preprocessor.should_upload_function(): + self.body[SOURCE_UPLOAD_URL] = self._upload_source_code() + if self.validate_body: + self._validate_all_body_fields() + self._set_airflow_version_label() + if not self._check_if_function_exists(): + self._create_new_function() + else: + self._update_function() + + +SOURCE_ARCHIVE_URL = 'sourceArchiveUrl' +SOURCE_UPLOAD_URL = 'sourceUploadUrl' +SOURCE_REPOSITORY = 'sourceRepository' +ZIP_PATH = 'zip_path' + + +class ZipPathPreprocessor: + """ + Pre-processes zip path parameter. + + Responsible for checking if the zip path parameter is correctly specified in + relation with source_code body fields. Non empty zip path parameter is special because + it is mutually exclusive with sourceArchiveUrl and sourceRepository body fields. + It is also mutually exclusive with non-empty sourceUploadUrl. + The pre-process modifies sourceUploadUrl body field in special way when zip_path + is not empty. An extra step is run when execute method is called and sourceUploadUrl + field value is set in the body with the value returned by generateUploadUrl Cloud + Function API method. + + :param body: Body passed to the create/update method calls. + :type body: dict + :param zip_path: path to the zip file containing source code. + :type body: dict + + """ + upload_function = None + + def __init__(self, body, zip_path): + self.body = body + self.zip_path = zip_path + + @staticmethod + def _is_present_and_empty(dictionary, field): + return field in dictionary and not dictionary[field] + + def _verify_upload_url_and_no_zip_path(self): + if self._is_present_and_empty(self.body, SOURCE_UPLOAD_URL): + if not self.zip_path: + raise AirflowException( + "Parameter '{}' is empty in the body and argument '{}' " + "is missing or empty. You need to have non empty '{}' " + "when '{}' is present and empty.". + format(SOURCE_UPLOAD_URL, ZIP_PATH, ZIP_PATH, SOURCE_UPLOAD_URL)) + + def _verify_upload_url_and_zip_path(self): + if SOURCE_UPLOAD_URL in self.body and self.zip_path: + if not self.body[SOURCE_UPLOAD_URL]: + self.upload_function = True + else: + raise AirflowException("Only one of '{}' in body or '{}' argument " + "allowed. Found both." + .format(SOURCE_UPLOAD_URL, ZIP_PATH)) + + def _verify_archive_url_and_zip_path(self): + if SOURCE_ARCHIVE_URL in self.body and self.zip_path: + raise AirflowException("Only one of '{}' in body or '{}' argument " + "allowed. Found both." + .format(SOURCE_ARCHIVE_URL, ZIP_PATH)) + + def should_upload_function(self): + if self.upload_function is None: + raise AirflowException('validate() method has to be invoked before ' + 'should_upload_function') + return self.upload_function + + def preprocess_body(self): + self._verify_archive_url_and_zip_path() + self._verify_upload_url_and_zip_path() + self._verify_upload_url_and_no_zip_path() + if self.upload_function is None: + self.upload_function = False + + +FUNCTION_NAME_PATTERN = '^projects/[^/]+/locations/[^/]+/functions/[^/]+$' +FUNCTION_NAME_COMPILED_PATTERN = re.compile(FUNCTION_NAME_PATTERN) + + +class GcfFunctionDeleteOperator(BaseOperator): + """ + Delete a function with specified name from Google Cloud Functions. + + :param name: A fully-qualified function name, matching + the pattern: `^projects/[^/]+/locations/[^/]+/functions/[^/]+$` + :type name: str + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: str + :param api_version: Version of the API used (for example v1). + :type api_version: str + """ + + @apply_defaults + def __init__(self, + name, + gcp_conn_id='google_cloud_default', + api_version='v1', + *args, **kwargs): + self.name = name + self.gcp_conn_id = gcp_conn_id + self.api_version = api_version + self._validate_inputs() + self.hook = GcfHook(gcp_conn_id=self.gcp_conn_id, api_version=self.api_version) + super(GcfFunctionDeleteOperator, self).__init__(*args, **kwargs) + + def _validate_inputs(self): + if not self.name: + raise AttributeError('Empty parameter: name') + else: + pattern = FUNCTION_NAME_COMPILED_PATTERN + if not pattern.match(self.name): + raise AttributeError( + 'Parameter name must match pattern: {}'.format(FUNCTION_NAME_PATTERN)) + + def execute(self, context): + try: + return self.hook.delete_function(self.name) + except HttpError as e: + status = e.resp.status + if status == 404: + self.log.info('The function does not exist in this project') + else: + self.log.error('An error occurred. Exiting.') + raise e diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst index efd544efed3ba..0d973a391cc7b 100644 --- a/docs/howto/operator.rst +++ b/docs/howto/operator.rst @@ -101,3 +101,123 @@ to execute a BigQuery load job. :dedent: 4 :start-after: [START howto_operator_gcs_to_bq] :end-before: [END howto_operator_gcs_to_bq] + +GcfFunctionDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the ``default_args`` dict to pass arguments to the operator. + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py + :language: python + :start-after: [START howto_operator_gcf_delete_args] + :end-before: [END howto_operator_gcf_delete_args] + + +Use the :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator` +to delete a function from Google Cloud Functions. + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py + :language: python + :start-after: [START howto_operator_gcf_delete] + :end-before: [END howto_operator_gcf_delete] + +Troubleshooting +""""""""""""""" + +In case you want to run deploy operator using a service account and get "forbidden 403" +errors, it means that your service account has not enough permissions set via IAM. + +* First you need to Assign your Service Account "Cloud Functions Developer" role +* Make sure you grant the user the IAM Service Account User role on the Cloud Functions +Runtime service account. Typical way of doing it with gcloud is shown below - just +replace PROJECT_ID with ID of your project and SERVICE_ACCOUNT_EMAIL with the email id +of your service account. + +.. code-block:: bash + + gcloud iam service-accounts add-iam-policy-binding \ + PROJECT_ID@appspot.gserviceaccount.com \ + --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \ + --role="roles/iam.serviceAccountUser" + + +See `Adding the IAM service agent user role to the runtime service `_ for details + +GcfFunctionDeployOperator +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator` +to deploy a function from Google Cloud Functions. + +The examples below use Airflow variables defined in order to show various variants and +combinations of default_args you can use. The variables are defined as follows: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy_variables] + :end-before: [END howto_operator_gcf_deploy_variables] + +With those variables one can define body of the request: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy_body] + :end-before: [END howto_operator_gcf_deploy_body] + +The default_args dictionary when you create DAG can be used to pass body and other +arguments: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy_args] + :end-before: [END howto_operator_gcf_deploy_args] + +Note that the neither the body nor default args are complete in the above examples. +Depending on the variables set there might be different variants on how to pass +source code related fields. Currently you can pass either +`sourceArchiveUrl`, `sourceRepository` or `sourceUploadUrl` as described in +`CloudFunction API specification `_. +Additionally default_args might contain `zip_path` parameter to run extra step +of uploading the source code before deploying it. In the last case you also need to +provide an empty `sourceUploadUrl` parameter in the body. + +Example logic of setting the source code related fields based on variables defined above +is shown here: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy_variants] + :end-before: [END howto_operator_gcf_deploy_variants] + +The code to create the operator: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy] + :end-before: [END howto_operator_gcf_deploy] + +Troubleshooting +""""""""""""""" + +In case you want to run deploy operator using a service account and get "forbidden 403" +errors, it means that your service account has not enough permissions set via IAM. + +* First you need to Assign your Service Account "Cloud Functions Developer" role +* Make sure you grant the user the IAM Service Account User role on the Cloud Functions +Runtime service account. Typical way of doing it with gcloud is shown below - just +replace PROJECT_ID with ID of your project and SERVICE_ACCOUNT_EMAIL with the email id +of your service account. + +.. code-block:: bash + + gcloud iam service-accounts add-iam-policy-binding \ + PROJECT_ID@appspot.gserviceaccount.com \ + --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \ + --role="roles/iam.serviceAccountUser" + + +See `Adding the IAM service agent user role to the runtime service `_ for details + +Also make sure that your service account has access to the source code of function +in case it should be downloaded. It might mean that you add Source Repository Viewer +role to the service account in case the source code is in Google Source Repository. diff --git a/docs/integration.rst b/docs/integration.rst index 522d4bbd446ab..c4800d65ac298 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -448,6 +448,40 @@ BigQueryHook :members: +Cloud Functions +''''''''''''''' + +Cloud Functions Operators +""""""""""""""""""""""""" + +- :ref:`GcfFunctionDeployOperator` : deploy Google Cloud Function to the cloud. +- :ref:`GcfFunctionDeleteOperator` : delete Google Cloud Function in the cloud. + +.. autoclass:: airflow.contrib.operators.gcp_operator.GCP + +.. _GcfFunctionDeployOperator: + +GcfFunctionDeployOperator +^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator + + +.. _GcfFunctionDeleteOperator: + +GcfFunctionDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator + + +Cloud Functions Hook +"""""""""""""""""""" + +.. autoclass:: airflow.contrib.hooks.gcp_function_hook.GcfHook + :members: + + Cloud DataFlow '''''''''''''' diff --git a/tests/contrib/operators/test_gcp_function_operator.py b/tests/contrib/operators/test_gcp_function_operator.py new file mode 100644 index 0000000000000..d7585ae66fdef --- /dev/null +++ b/tests/contrib/operators/test_gcp_function_operator.py @@ -0,0 +1,624 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from googleapiclient.errors import HttpError +from parameterized import parameterized + +from airflow.contrib.operators.gcp_function_operator import \ + GcfFunctionDeployOperator, GcfFunctionDeleteOperator, FUNCTION_NAME_PATTERN +from airflow import AirflowException +from airflow.version import version + +from copy import deepcopy + +try: + # noinspection PyProtectedMember + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +EMPTY_CONTENT = ''.encode('utf8') +MOCK_RESP_404 = type('', (object,), {"status": 404})() + +PROJECT_ID = 'test_project_id' +LOCATION = 'test_region' +SOURCE_ARCHIVE_URL = 'gs://folder/file.zip' +ENTRYPOINT = 'helloWorld' +FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION, + ENTRYPOINT) +RUNTIME = 'nodejs6' +VALID_RUNTIMES = ['nodejs6', 'nodejs8', 'python37'] +VALID_BODY = { + "name": FUNCTION_NAME, + "entryPoint": ENTRYPOINT, + "runtime": RUNTIME, + "httpsTrigger": {}, + "sourceArchiveUrl": SOURCE_ARCHIVE_URL +} + + +def _prepare_test_bodies(): + body_no_name = deepcopy(VALID_BODY) + body_no_name.pop('name', None) + body_empty_entry_point = deepcopy(VALID_BODY) + body_empty_entry_point['entryPoint'] = '' + body_empty_runtime = deepcopy(VALID_BODY) + body_empty_runtime['runtime'] = '' + body_values = [ + ({}, "The required parameter 'body' is missing"), + (body_no_name, "The required body field 'name' is missing"), + (body_empty_entry_point, + "The body field 'entryPoint' of value '' does not match"), + (body_empty_runtime, "The body field 'runtime' of value '' does not match"), + ] + return body_values + + +class GcfFunctionDeployTest(unittest.TestCase): + @parameterized.expand(_prepare_test_bodies()) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_body_empty_or_missing_fields(self, body, message, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertIn(message, str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_deploy_execute(self, mock_hook): + mock_hook.return_value.get_function.side_effect = mock.Mock( + side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found')) + mock_hook.return_value.create_new_function.return_value = True + op = GcfFunctionDeployOperator( + project_id=PROJECT_ID, + location=LOCATION, + body=deepcopy(VALID_BODY), + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.get_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld' + ) + expected_body = deepcopy(VALID_BODY) + expected_body['labels'] = { + 'airflow-version': 'v' + version.replace('.', '-').replace('+', '-') + } + mock_hook.return_value.create_new_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region', + expected_body + ) + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_update_function_if_exists(self, mock_hook): + mock_hook.return_value.get_function.return_value = True + mock_hook.return_value.update_function.return_value = True + op = GcfFunctionDeployOperator( + project_id=PROJECT_ID, + location=LOCATION, + body=deepcopy(VALID_BODY), + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.get_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld' + ) + expected_body = deepcopy(VALID_BODY) + expected_body['labels'] = { + 'airflow-version': 'v' + version.replace('.', '-').replace('+', '-') + } + mock_hook.return_value.update_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld', + expected_body, expected_body.keys()) + mock_hook.return_value.create_new_function.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_empty_project_id(self, mock_hook): + with self.assertRaises(AirflowException) as cm: + GcfFunctionDeployOperator( + project_id="", + location="test_region", + body=None, + task_id="id" + ) + err = cm.exception + self.assertIn("The required parameter 'project_id' is missing", str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_empty_location(self, mock_hook): + with self.assertRaises(AirflowException) as cm: + GcfFunctionDeployOperator( + project_id="test_project_id", + location="", + body=None, + task_id="id" + ) + err = cm.exception + self.assertIn("The required parameter 'location' is missing", str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_empty_body(self, mock_hook): + with self.assertRaises(AirflowException) as cm: + GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=None, + task_id="id" + ) + err = cm.exception + self.assertIn("The required parameter 'body' is missing", str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + @parameterized.expand([ + (runtime,) for runtime in VALID_RUNTIMES + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_correct_runtime_field(self, runtime, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body['runtime'] = runtime + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + (network,) for network in [ + "network-01", + "n-0-2-3-4", + "projects/PROJECT/global/networks/network-01" + "projects/PRÓJECT/global/networks/netwórk-01" + ] + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_valid_network_field(self, network, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body['network'] = network + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + (labels,) for labels in [ + {}, + {"label": 'value-01'}, + {"label_324234_a_b_c": 'value-01_93'}, + ] + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_valid_labels_field(self, labels, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body['labels'] = labels + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_validation_disabled(self, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = { + "name": "function_name", + "some_invalid_body_field": "some_invalid_body_field_value" + } + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + validate_body=False, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_body_validation_simple(self, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body['name'] = '' + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertIn("The body field 'name' of value '' does not match", + str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + ('name', '', + "The body field 'name' of value '' does not match"), + ('description', '', "The body field 'description' of value '' does not match"), + ('entryPoint', '', "The body field 'entryPoint' of value '' does not match"), + ('availableMemoryMb', '0', + "The available memory has to be greater than 0"), + ('availableMemoryMb', '-1', + "The available memory has to be greater than 0"), + ('availableMemoryMb', 'ss', + "invalid literal for int() with base 10: 'ss'"), + ('network', '', "The body field 'network' of value '' does not match"), + ('maxInstances', '0', "The max instances parameter has to be greater than 0"), + ('maxInstances', '-1', "The max instances parameter has to be greater than 0"), + ('maxInstances', 'ss', "invalid literal for int() with base 10: 'ss'"), + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_invalid_field_values(self, key, value, message, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body[key] = value + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertIn(message, str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + ({'sourceArchiveUrl': ''}, + "The body field 'source_code.sourceArchiveUrl' of value '' does not match"), + ({'sourceArchiveUrl': '', 'zip_path': '/path/to/file'}, + "Only one of 'sourceArchiveUrl' in body or 'zip_path' argument allowed."), + ({'sourceArchiveUrl': 'gs://url', 'zip_path': '/path/to/file'}, + "Only one of 'sourceArchiveUrl' in body or 'zip_path' argument allowed."), + ({'sourceArchiveUrl': '', 'sourceUploadUrl': ''}, + "Parameter 'sourceUploadUrl' is empty in the body and argument " + "'zip_path' is missing or empty."), + ({'sourceArchiveUrl': 'gs://adasda', 'sourceRepository': ''}, + "The field 'source_code.sourceRepository' should be dictionary type"), + ({'sourceUploadUrl': '', 'sourceRepository': ''}, + "Parameter 'sourceUploadUrl' is empty in the body and argument 'zip_path' " + "is missing or empty."), + ({'sourceArchiveUrl': '', 'sourceUploadUrl': '', 'sourceRepository': ''}, + "Parameter 'sourceUploadUrl' is empty in the body and argument 'zip_path' " + "is missing or empty."), + ({'sourceArchiveUrl': 'gs://url', 'sourceUploadUrl': 'https://url'}, + "The mutually exclusive fields 'sourceUploadUrl' and 'sourceArchiveUrl' " + "belonging to the union 'source_code' are both present. Please remove one"), + ({'sourceUploadUrl': 'https://url', 'zip_path': '/path/to/file'}, + "Only one of 'sourceUploadUrl' in body " + "or 'zip_path' argument allowed. Found both."), + ({'sourceUploadUrl': ''}, "Parameter 'sourceUploadUrl' is empty in the body " + "and argument 'zip_path' is missing or empty."), + ({'sourceRepository': ''}, "The field 'source_code.sourceRepository' " + "should be dictionary type"), + ({'sourceRepository': {}}, "The required body field " + "'source_code.sourceRepository.url' is missing"), + ({'sourceRepository': {'url': ''}}, + "The body field 'source_code.sourceRepository.url' of value '' does not match"), + ] + ) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_invalid_source_code_union_field(self, source_code, message, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + body = deepcopy(VALID_BODY) + body.pop('sourceUploadUrl', None) + body.pop('sourceArchiveUrl', None) + zip_path = source_code.pop('zip_path', None) + body.update(source_code) + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id", + zip_path=zip_path + ) + op.execute(None) + err = cm.exception + self.assertIn(message, str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + ({'sourceArchiveUrl': 'gs://url'},), + ({'zip_path': '/path/to/file', 'sourceUploadUrl': None},), + ({'sourceUploadUrl': + 'https://source.developers.google.com/projects/a/repos/b/revisions/c/paths/d'},), + ({'sourceRepository': + {'url': 'https://source.developers.google.com/projects/a/' + 'repos/b/revisions/c/paths/d'}},), + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_valid_source_code_union_field(self, source_code, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + mock_hook.return_value.get_function.side_effect = mock.Mock( + side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found')) + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body.pop('sourceUploadUrl', None) + body.pop('sourceArchiveUrl', None) + body.pop('sourceRepository', None) + body.pop('sourceRepositoryUrl', None) + zip_path = source_code.pop('zip_path', None) + body.update(source_code) + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id", + zip_path=zip_path + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + if zip_path: + mock_hook.return_value.upload_function_zip.assert_called_once_with( + parent='projects/test_project_id/locations/test_region', + zip_path='/path/to/file' + ) + mock_hook.return_value.get_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld' + ) + mock_hook.return_value.create_new_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region', + body + ) + mock_hook.reset_mock() + + @parameterized.expand([ + ({'eventTrigger': {}}, + "The required body field 'trigger.eventTrigger.eventType' is missing"), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b'}}, + "The required body field 'trigger.eventTrigger.resource' is missing"), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', 'resource': ''}}, + "The body field 'trigger.eventTrigger.resource' of value '' does not match"), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res', + 'service': ''}}, + "The body field 'trigger.eventTrigger.service' of value '' does not match"), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res', + 'service': 'service_name', + 'failurePolicy': {'retry': ''}}}, + "The field 'trigger.eventTrigger.failurePolicy.retry' " + "should be dictionary type") + ] + ) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_invalid_trigger_union_field(self, trigger, message, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + body = deepcopy(VALID_BODY) + body.pop('httpsTrigger', None) + body.pop('eventTrigger', None) + body.update(trigger) + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id", + ) + op.execute(None) + err = cm.exception + self.assertIn(message, str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + ({'httpsTrigger': {}},), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res'}},), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res', + 'service': 'service_name'}},), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/ą.b', + 'resource': 'reś', + 'service': 'service_namę'}},), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res', + 'service': 'service_name', + 'failurePolicy': {'retry': {}}}},) + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_valid_trigger_union_field(self, trigger, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + mock_hook.return_value.get_function.side_effect = mock.Mock( + side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found')) + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body.pop('httpsTrigger', None) + body.pop('eventTrigger', None) + body.update(trigger) + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id", + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.get_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld' + ) + mock_hook.return_value.create_new_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region', + body + ) + mock_hook.reset_mock() + + +class GcfFunctionDeleteTest(unittest.TestCase): + _FUNCTION_NAME = 'projects/project_name/locations/project_location/functions' \ + '/function_name' + _DELETE_FUNCTION_EXPECTED = { + '@type': 'type.googleapis.com/google.cloud.functions.v1.CloudFunction', + 'name': _FUNCTION_NAME, + 'sourceArchiveUrl': 'gs://functions/hello.zip', + 'httpsTrigger': { + 'url': 'https://project_location-project_name.cloudfunctions.net' + '/function_name'}, + 'status': 'ACTIVE', 'entryPoint': 'entry_point', 'timeout': '60s', + 'availableMemoryMb': 256, + 'serviceAccountEmail': 'project_name@appspot.gserviceaccount.com', + 'updateTime': '2018-08-23T00:00:00Z', + 'versionId': '1', 'runtime': 'nodejs6'} + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_delete_execute(self, mock_hook): + mock_hook.return_value.delete_function.return_value = \ + self._DELETE_FUNCTION_EXPECTED + op = GcfFunctionDeleteOperator( + name=self._FUNCTION_NAME, + task_id="id" + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.delete_function.assert_called_once_with( + 'projects/project_name/locations/project_location/functions/function_name' + ) + self.assertEqual(result['name'], self._FUNCTION_NAME) + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_correct_name(self, mock_hook): + op = GcfFunctionDeleteOperator( + name="projects/project_name/locations/project_location/functions" + "/function_name", + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_invalid_name(self, mock_hook): + with self.assertRaises(AttributeError) as cm: + op = GcfFunctionDeleteOperator( + name="invalid_name", + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertEqual(str(err), 'Parameter name must match pattern: {}'.format( + FUNCTION_NAME_PATTERN)) + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_empty_name(self, mock_hook): + mock_hook.return_value.delete_function.return_value = \ + self._DELETE_FUNCTION_EXPECTED + with self.assertRaises(AttributeError) as cm: + GcfFunctionDeleteOperator( + name="", + task_id="id" + ) + err = cm.exception + self.assertEqual(str(err), 'Empty parameter: name') + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_gcf_error_silenced_when_function_doesnt_exist(self, mock_hook): + op = GcfFunctionDeleteOperator( + name=self._FUNCTION_NAME, + task_id="id" + ) + mock_hook.return_value.delete_function.side_effect = mock.Mock( + side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found')) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.delete_function.assert_called_once_with( + 'projects/project_name/locations/project_location/functions/function_name' + ) + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_non_404_gcf_error_bubbled_up(self, mock_hook): + op = GcfFunctionDeleteOperator( + name=self._FUNCTION_NAME, + task_id="id" + ) + resp = type('', (object,), {"status": 500})() + mock_hook.return_value.delete_function.side_effect = mock.Mock( + side_effect=HttpError(resp=resp, content=b'error')) + + with self.assertRaises(HttpError): + op.execute(None) + + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.delete_function.assert_called_once_with( + 'projects/project_name/locations/project_location/functions/function_name' + )