diff --git a/airflow/contrib/example_dags/example_gcp_function_delete.py b/airflow/contrib/example_dags/example_gcp_function_delete.py new file mode 100644 index 0000000000000..30f5369af60ef --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_function_delete.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that deletes a Google Cloud Function. +This DAG relies on the following Airflow variables +https://airflow.apache.org/concepts.html#variables +* PROJECT_ID - Google Cloud Project where the Cloud Function exists. +* LOCATION - Google Cloud Functions region where the function exists. +* ENTRYPOINT - Name of the executable function in the source code. +""" + +import datetime + +import airflow +from airflow import models +from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator + +# [START howto_operator_gcf_delete_args] +PROJECT_ID = models.Variable.get('PROJECT_ID', '') +LOCATION = models.Variable.get('LOCATION', '') +ENTRYPOINT = models.Variable.get('ENTRYPOINT', '') +# A fully-qualified name of the function to delete + +FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION, + ENTRYPOINT) +default_args = { + 'start_date': airflow.utils.dates.days_ago(1) +} +# [END howto_operator_gcf_delete_args] + +with models.DAG( + 'example_gcp_function_delete', + default_args=default_args, + schedule_interval=datetime.timedelta(days=1) +) as dag: + # [START howto_operator_gcf_delete] + t1 = GcfFunctionDeleteOperator( + task_id="gcf_delete_task", + name=FUNCTION_NAME + ) + # [END howto_operator_gcf_delete] diff --git a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py new file mode 100644 index 0000000000000..a0e44957b9204 --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that creates a Google Cloud Function and then deletes it. + +This DAG relies on the following Airflow variables +https://airflow.apache.org/concepts.html#variables +* PROJECT_ID - Google Cloud Project to use for the Cloud Function. +* LOCATION - Google Cloud Functions region where the function should be + created. +* SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage +or + * SOURCE_UPLOAD_URL - Generated upload URL for the zipped source + or + * ZIP_PATH - Local path to the zipped source archive +or +* SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function is +defined in a supported Cloud Source Repository URL format +https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository +* ENTRYPOINT - Name of the executable function in the source code. +""" + +import datetime + +from airflow import models +from airflow.contrib.operators.gcp_function_operator \ + import GcfFunctionDeployOperator, GcfFunctionDeleteOperator +from airflow.utils import dates + +# [START howto_operator_gcf_deploy_variables] +PROJECT_ID = models.Variable.get('PROJECT_ID', '') +LOCATION = models.Variable.get('LOCATION', '') +SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '') +SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '') +SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '') +ZIP_PATH = models.Variable.get('ZIP_PATH', '') +ENTRYPOINT = models.Variable.get('ENTRYPOINT', '') +FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION, + ENTRYPOINT) +RUNTIME = 'nodejs6' +VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True) + +# [END howto_operator_gcf_deploy_variables] + +# [START howto_operator_gcf_deploy_body] +body = { + "name": FUNCTION_NAME, + "entryPoint": ENTRYPOINT, + "runtime": RUNTIME, + "httpsTrigger": {} +} +# [END howto_operator_gcf_deploy_body] + +# [START howto_operator_gcf_deploy_args] +default_args = { + 'start_date': dates.days_ago(1), + 'project_id': PROJECT_ID, + 'location': LOCATION, + 'body': body, + 'validate_body': VALIDATE_BODY +} +# [END howto_operator_gcf_deploy_args] + +# [START howto_operator_gcf_deploy_variants] +if SOURCE_ARCHIVE_URL: + body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL +elif SOURCE_REPOSITORY: + body['sourceRepository'] = { + 'url': SOURCE_REPOSITORY + } +elif ZIP_PATH: + body['sourceUploadUrl'] = '' + default_args['zip_path'] = ZIP_PATH +elif SOURCE_UPLOAD_URL: + body['sourceUploadUrl'] = SOURCE_UPLOAD_URL +else: + raise Exception("Please provide one of the source_code parameters") +# [END howto_operator_gcf_deploy_variants] + + +with models.DAG( + 'example_gcp_function_deploy_delete', + default_args=default_args, + schedule_interval=datetime.timedelta(days=1) +) as dag: + # [START howto_operator_gcf_deploy] + deploy_task = GcfFunctionDeployOperator( + task_id="gcf_deploy_task", + name=FUNCTION_NAME + ) + # [END howto_operator_gcf_deploy] + delete_task = GcfFunctionDeleteOperator( + task_id="gcf_delete_task", + name=FUNCTION_NAME + ) + deploy_task >> delete_task diff --git a/airflow/contrib/hooks/gcp_function_hook.py b/airflow/contrib/hooks/gcp_function_hook.py new file mode 100644 index 0000000000000..d2da19285ab13 --- /dev/null +++ b/airflow/contrib/hooks/gcp_function_hook.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import time +import requests +from googleapiclient.discovery import build + +from airflow import AirflowException +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + +# Number of retries - used by googleapiclient method calls to perform retries +# For requests that are "retriable" +NUM_RETRIES = 5 + +# Time to sleep between active checks of the operation results +TIME_TO_SLEEP_IN_SECONDS = 1 + + +# noinspection PyAbstractClass +class GcfHook(GoogleCloudBaseHook): + """ + Hook for Google Cloud Functions APIs. + """ + _conn = None + + def __init__(self, + api_version, + gcp_conn_id='google_cloud_default', + delegate_to=None): + super(GcfHook, self).__init__(gcp_conn_id, delegate_to) + self.api_version = api_version + + def get_conn(self): + """ + Retrieves connection to cloud functions. + + :return: Google Cloud Function services object + :rtype: dict + """ + if not self._conn: + http_authorized = self._authorize() + self._conn = build('cloudfunctions', self.api_version, + http=http_authorized, cache_discovery=False) + return self._conn + + def get_function(self, name): + """ + Returns the function with a given name. + + :param name: name of the function + :type name: str + :return: a CloudFunction object representing the function + :rtype: dict + """ + return self.get_conn().projects().locations().functions().get( + name=name).execute(num_retries=NUM_RETRIES) + + def list_functions(self, full_location): + """ + Lists all functions created in the location. + + :param full_location: full location including project. On the form + of /projects//location/ + :type full_location: str + :return: array of CloudFunction objects - representing functions in the location + :rtype: [dict] + """ + list_response = self.get_conn().projects().locations().functions().list( + parent=full_location).execute(num_retries=NUM_RETRIES) + return list_response.get("functions", []) + + def create_new_function(self, full_location, body): + """ + Creates new cloud function in location given with body specified. + + :param full_location: full location including project. On the form + of /projects//location/ + :type full_location: str + :param body: body required by the cloud function insert API + :type body: dict + :return: response returned by the operation + :rtype: dict + """ + response = self.get_conn().projects().locations().functions().create( + location=full_location, + body=body + ).execute(num_retries=NUM_RETRIES) + operation_name = response["name"] + return self._wait_for_operation_to_complete(operation_name) + + def update_function(self, name, body, update_mask): + """ + Updates cloud function according to the update mask specified. + + :param name: name of the function + :type name: str + :param body: body required by the cloud function patch API + :type body: str + :param update_mask: update mask - array of fields that should be patched + :type update_mask: [str] + :return: response returned by the operation + :rtype: dict + """ + response = self.get_conn().projects().locations().functions().patch( + updateMask=",".join(update_mask), + name=name, + body=body + ).execute(num_retries=NUM_RETRIES) + operation_name = response["name"] + return self._wait_for_operation_to_complete(operation_name) + + def upload_function_zip(self, parent, zip_path): + """ + Uploads zip file with sources. + + :param parent: project and location in which signed upload URL should be generated + in the form of /projects//location/ + :type parent: str + :param zip_path: path of the file to upload (should point to valid .zip file) + :type zip_path: str + :return: Upload URL that was returned by generateUploadUrl method + """ + response = self.get_conn().projects().locations().functions().generateUploadUrl( + parent=parent + ).execute(num_retries=NUM_RETRIES) + upload_url = response.get('uploadUrl') + with open(zip_path, 'rb') as fp: + requests.put( + url=upload_url, + data=fp.read(), + # Those two headers needs to be specified according to: + # https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/generateUploadUrl + # nopep8 + headers={ + 'Content-type': 'application/zip', + 'x-goog-content-length-range': '0,104857600', + } + ) + return upload_url + + def delete_function(self, name): + """ + Deletes cloud function specified by name. + + :param name: name of the function + :type name: str + :return: response returned by the operation + :rtype: dict + """ + response = self.get_conn().projects().locations().functions().delete( + name=name).execute(num_retries=NUM_RETRIES) + operation_name = response["name"] + return self._wait_for_operation_to_complete(operation_name) + + def _wait_for_operation_to_complete(self, operation_name): + """ + Waits for the named operation to complete - checks status of the + asynchronous call. + + :param operation_name: name of the operation + :type operation_name: str + :return: response returned by the operation + :rtype: dict + :exception: AirflowException in case error is returned + """ + service = self.get_conn() + while True: + operation_response = service.operations().get( + name=operation_name, + ).execute(num_retries=NUM_RETRIES) + if operation_response.get("done"): + response = operation_response.get("response") + error = operation_response.get("error") + # Note, according to documentation always either response or error is + # set when "done" == True + if error: + raise AirflowException(str(error)) + return response + time.sleep(TIME_TO_SLEEP_IN_SECONDS) diff --git a/airflow/contrib/operators/gcp_function_operator.py b/airflow/contrib/operators/gcp_function_operator.py new file mode 100644 index 0000000000000..4455307c93259 --- /dev/null +++ b/airflow/contrib/operators/gcp_function_operator.py @@ -0,0 +1,563 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import re + +from googleapiclient.errors import HttpError + +from airflow import AirflowException, LoggingMixin +from airflow.version import version +from airflow.models import BaseOperator +from airflow.contrib.hooks.gcp_function_hook import GcfHook +from airflow.utils.decorators import apply_defaults + +# TODO: This whole section should be extracted later to contrib/tools/field_validator.py + +COMPOSITE_FIELD_TYPES = ['union', 'dict'] + + +class FieldValidationException(AirflowException): + """ + Thrown when validation finds dictionary field not valid according to specification. + """ + + def __init__(self, message): + super(FieldValidationException, self).__init__(message) + + +class ValidationSpecificationException(AirflowException): + """ + Thrown when validation specification is wrong + (rather than dictionary being validated). + This should only happen during development as ideally + specification itself should not be invalid ;) . + """ + + def __init__(self, message): + super(ValidationSpecificationException, self).__init__(message) + + +# TODO: make better description, add some examples +# TODO: move to contrib/utils folder when we reuse it. +class BodyFieldValidator(LoggingMixin): + """ + Validates correctness of request body according to specification. + The specification can describe various type of + fields including custom validation, and union of fields. This validator is meant + to be reusable by various operators + in the near future, but for now it is left as part of the Google Cloud Function, + so documentation about the + validator is not yet complete. To see what kind of specification can be used, + please take a look at + gcp_function_operator.CLOUD_FUNCTION_VALIDATION which specifies validation + for GCF deploy operator. + + :param validation_specs: dictionary describing validation specification + :type validation_specs: [dict] + :param api_version: Version of the api used (for example v1) + :type api_version: str + + """ + def __init__(self, validation_specs, api_version): + # type: ([dict], str) -> None + super(BodyFieldValidator, self).__init__() + self._validation_specs = validation_specs + self._api_version = api_version + + @staticmethod + def _get_field_name_with_parent(field_name, parent): + if parent: + return parent + '.' + field_name + return field_name + + @staticmethod + def _sanity_checks(children_validation_specs, field_type, full_field_path, + regexp, custom_validation, value): + # type: (dict, str, str, str, function, object) -> None + if value is None and field_type != 'union': + raise FieldValidationException( + "The required body field '{}' is missing. Please add it.". + format(full_field_path)) + if regexp and field_type: + raise ValidationSpecificationException( + "The validation specification entry '{}' has both type and regexp. " + "The regexp is only allowed without type (i.e. assume type is 'str' " + "that can be validated with regexp)".format(full_field_path)) + if children_validation_specs and field_type not in COMPOSITE_FIELD_TYPES: + raise ValidationSpecificationException( + "Nested fields are specified in field '{}' of type '{}'. " + "Nested fields are only allowed for fields of those types: ('{}').". + format(full_field_path, field_type, COMPOSITE_FIELD_TYPES)) + if custom_validation and field_type: + raise ValidationSpecificationException( + "The validation specification field '{}' has both type and " + "custom_validation. Custom validation is only allowed without type.". + format(full_field_path)) + + @staticmethod + def _validate_regexp(full_field_path, regexp, value): + # type: (str, str, str) -> None + if not re.match(regexp, value): + # Note matching of only the beginning as we assume the regexps all-or-nothing + raise FieldValidationException( + "The body field '{}' of value '{}' does not match the field " + "specification regexp: '{}'.". + format(full_field_path, value, regexp)) + + def _validate_dict(self, children_validation_specs, full_field_path, value): + # type: (dict, str, dict) -> None + for child_validation_spec in children_validation_specs: + self._validate_field(validation_spec=child_validation_spec, + dictionary_to_validate=value, + parent=full_field_path) + for field_name in value.keys(): + if field_name not in [spec['name'] for spec in children_validation_specs]: + self.log.warning( + "The field '{}' is in the body, but is not specified in the " + "validation specification '{}'. " + "This might be because you are using newer API version and " + "new field names defined for that version. Then the warning " + "can be safely ignored, or you might want to upgrade the operator" + "to the version that supports the new API version.".format( + self._get_field_name_with_parent(field_name, full_field_path), + children_validation_specs)) + + def _validate_union(self, children_validation_specs, full_field_path, + dictionary_to_validate): + # type: (dict, str, dict) -> None + field_found = False + found_field_name = None + for child_validation_spec in children_validation_specs: + # Forcing optional so that we do not have to type optional = True + # in specification for all union fields + new_field_found = self._validate_field( + validation_spec=child_validation_spec, + dictionary_to_validate=dictionary_to_validate, + parent=full_field_path, + force_optional=True) + field_name = child_validation_spec['name'] + if new_field_found and field_found: + raise FieldValidationException( + "The mutually exclusive fields '{}' and '{}' belonging to the " + "union '{}' are both present. Please remove one". + format(field_name, found_field_name, full_field_path)) + if new_field_found: + field_found = True + found_field_name = field_name + if not field_found: + self.log.warning( + "There is no '{}' union defined in the body {}. " + "Validation expected one of '{}' but could not find any. It's possible " + "that you are using newer API version and there is another union variant " + "defined for that version. Then the warning can be safely ignored, " + "or you might want to upgrade the operator to the version that " + "supports the new API version.".format( + full_field_path, + dictionary_to_validate, + [field['name'] for field in children_validation_specs])) + + def _validate_field(self, validation_spec, dictionary_to_validate, parent=None, + force_optional=False): + """ + Validates if field is OK. + :param validation_spec: specification of the field + :type validation_spec: dict + :param dictionary_to_validate: dictionary where the field should be present + :type dictionary_to_validate: dict + :param parent: full path of parent field + :type parent: str + :param force_optional: forces the field to be optional + (all union fields have force_optional set to True) + :type force_optional: bool + :return: True if the field is present + """ + field_name = validation_spec['name'] + field_type = validation_spec.get('type') + optional = validation_spec.get('optional') + regexp = validation_spec.get('regexp') + children_validation_specs = validation_spec.get('fields') + required_api_version = validation_spec.get('api_version') + custom_validation = validation_spec.get('custom_validation') + + full_field_path = self._get_field_name_with_parent(field_name=field_name, + parent=parent) + if required_api_version and required_api_version != self._api_version: + self.log.debug( + "Skipping validation of the field '{}' for API version '{}' " + "as it is only valid for API version '{}'". + format(field_name, self._api_version, required_api_version)) + return False + value = dictionary_to_validate.get(field_name) + + if (optional or force_optional) and value is None: + self.log.debug("The optional field '{}' is missing. That's perfectly OK.". + format(full_field_path)) + return False + + # Certainly down from here the field is present (value is not None) + # so we should only return True from now on + + self._sanity_checks(children_validation_specs=children_validation_specs, + field_type=field_type, + full_field_path=full_field_path, + regexp=regexp, + custom_validation=custom_validation, + value=value) + + if regexp: + self._validate_regexp(full_field_path, regexp, value) + elif field_type == 'dict': + if not isinstance(value, dict): + raise FieldValidationException( + "The field '{}' should be dictionary type according to " + "specification '{}' but it is '{}'". + format(full_field_path, validation_spec, value)) + if children_validation_specs is None: + self.log.debug( + "The dict field '{}' has no nested fields defined in the " + "specification '{}'. That's perfectly ok - it's content will " + "not be validated." + .format(full_field_path, validation_spec)) + else: + self._validate_dict(children_validation_specs, full_field_path, value) + elif field_type == 'union': + if not children_validation_specs: + raise ValidationSpecificationException( + "The union field '{}' has no nested fields " + "defined in specification '{}'. Unions should have at least one " + "nested field defined.".format(full_field_path, validation_spec)) + self._validate_union(children_validation_specs, full_field_path, + dictionary_to_validate) + elif custom_validation: + try: + custom_validation(value) + except Exception as e: + raise FieldValidationException( + "Error while validating custom field '{}' specified by '{}': '{}'". + format(full_field_path, validation_spec, e)) + elif field_type is None: + self.log.debug("The type of field '{}' is not specified in '{}'. " + "Not validating its content.". + format(full_field_path, validation_spec)) + else: + raise ValidationSpecificationException( + "The field '{}' is of type '{}' in specification '{}'." + "This type is unknown to validation!".format( + full_field_path, field_type, validation_spec)) + return True + + def validate(self, body_to_validate): + """ + Validates if the body (dictionary) follows specification that the validator was + instantiated with. Raises ValidationSpecificationException or + ValidationFieldException in case of problems with specification or the + body not conforming to the specification respectively. + :param body_to_validate: body that must follow the specification + :type body_to_validate: dict + :return: None + """ + try: + for validation_spec in self._validation_specs: + self._validate_field(validation_spec=validation_spec, + dictionary_to_validate=body_to_validate) + except FieldValidationException as e: + raise FieldValidationException( + "There was an error when validating: field '{}': '{}'". + format(body_to_validate, e)) + +# TODO End of field validator to be extracted + + +def _validate_available_memory_in_mb(value): + if int(value) <= 0: + raise FieldValidationException("The available memory has to be greater than 0") + + +def _validate_max_instances(value): + if int(value) <= 0: + raise FieldValidationException( + "The max instances parameter has to be greater than 0") + + +CLOUD_FUNCTION_VALIDATION = [ + dict(name="name", regexp="^.+$"), + dict(name="description", regexp="^.+$", optional=True), + dict(name="entryPoint", regexp=r'^.+$', optional=True), + dict(name="runtime", regexp=r'^.+$', optional=True), + dict(name="timeout", regexp=r'^.+$', optional=True), + dict(name="availableMemoryMb", custom_validation=_validate_available_memory_in_mb, + optional=True), + dict(name="labels", optional=True), + dict(name="environmentVariables", optional=True), + dict(name="network", regexp=r'^.+$', optional=True), + dict(name="maxInstances", optional=True, custom_validation=_validate_max_instances), + + dict(name="source_code", type="union", fields=[ + dict(name="sourceArchiveUrl", regexp=r'^.+$'), + dict(name="sourceRepositoryUrl", regexp=r'^.+$', api_version='v1beta2'), + dict(name="sourceRepository", type="dict", fields=[ + dict(name="url", regexp=r'^.+$') + ]), + dict(name="sourceUploadUrl") + ]), + + dict(name="trigger", type="union", fields=[ + dict(name="httpsTrigger", type="dict", fields=[ + # This dict should be empty at input (url is added at output) + ]), + dict(name="eventTrigger", type="dict", fields=[ + dict(name="eventType", regexp=r'^.+$'), + dict(name="resource", regexp=r'^.+$'), + dict(name="service", regexp=r'^.+$', optional=True), + dict(name="failurePolicy", type="dict", optional=True, fields=[ + dict(name="retry", type="dict", optional=True) + ]) + ]) + ]), +] + + +class GcfFunctionDeployOperator(BaseOperator): + """ + Create a function in Google Cloud Functions. + + :param project_id: Project ID that the operator works on + :type project_id: str + :param location: Region where the operator operates on + :type location: str + :param body: Body of the cloud function definition. The body must be a CloudFunction + dictionary as described in: + https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions + (note that different API versions require different + variants of the CloudFunction dictionary) + :type body: dict or google.cloud.functions.v1.CloudFunction + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: str + :param api_version: Version of the API used (for example v1). + :type api_version: str + :param zip_path: Path to zip file containing source code of the function. If it is + set, then sourceUploadUrl should not be specified in the body (or it should + be empty), then the zip file will be uploaded using upload URL generated + via generateUploadUrl from cloud functions API + :type zip_path: str + :param validate_body: If set to False, no body validation is performed. + :type validate_body: bool + """ + + @apply_defaults + def __init__(self, + project_id, + location, + body, + gcp_conn_id='google_cloud_default', + api_version='v1', + zip_path=None, + validate_body=True, + *args, **kwargs): + self.project_id = project_id + self.location = location + self.full_location = 'projects/{}/locations/{}'.format(self.project_id, + self.location) + self.body = body + self.gcp_conn_id = gcp_conn_id + self.api_version = api_version + self.zip_path = zip_path + self.zip_path_preprocessor = ZipPathPreprocessor(body, zip_path) + self.validate_body = validate_body + self._field_validator = BodyFieldValidator(CLOUD_FUNCTION_VALIDATION, + api_version=api_version) + self._hook = GcfHook(gcp_conn_id=self.gcp_conn_id, api_version=self.api_version) + self._validate_inputs() + super(GcfFunctionDeployOperator, self).__init__(*args, **kwargs) + + def _validate_inputs(self): + if not self.project_id: + raise AirflowException("The required parameter 'project_id' is missing") + if not self.location: + raise AirflowException("The required parameter 'location' is missing") + if not self.body: + raise AirflowException("The required parameter 'body' is missing") + self.zip_path_preprocessor.preprocess_body() + + def _validate_all_body_fields(self): + self._field_validator.validate(self.body) + + def _create_new_function(self): + self._hook.create_new_function(self.full_location, self.body) + + def _update_function(self): + self._hook.update_function(self.body['name'], self.body, self.body.keys()) + + def _check_if_function_exists(self): + name = self.body.get('name') + if not name: + raise FieldValidationException("The 'name' field should be present in " + "body: '{}'.".format(self.body)) + try: + self._hook.get_function(name) + except HttpError as e: + status = e.resp.status + if status == 404: + return False + raise e + return True + + def _upload_source_code(self): + return self._hook.upload_function_zip(parent=self.full_location, + zip_path=self.zip_path) + + def _set_airflow_version_label(self): + if 'labels' not in self.body.keys(): + self.body['labels'] = {} + self.body['labels'].update( + {'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')}) + + def execute(self, context): + if self.zip_path_preprocessor.should_upload_function(): + self.body[SOURCE_UPLOAD_URL] = self._upload_source_code() + if self.validate_body: + self._validate_all_body_fields() + self._set_airflow_version_label() + if not self._check_if_function_exists(): + self._create_new_function() + else: + self._update_function() + + +SOURCE_ARCHIVE_URL = 'sourceArchiveUrl' +SOURCE_UPLOAD_URL = 'sourceUploadUrl' +SOURCE_REPOSITORY = 'sourceRepository' +ZIP_PATH = 'zip_path' + + +class ZipPathPreprocessor: + """ + Pre-processes zip path parameter. + + Responsible for checking if the zip path parameter is correctly specified in + relation with source_code body fields. Non empty zip path parameter is special because + it is mutually exclusive with sourceArchiveUrl and sourceRepository body fields. + It is also mutually exclusive with non-empty sourceUploadUrl. + The pre-process modifies sourceUploadUrl body field in special way when zip_path + is not empty. An extra step is run when execute method is called and sourceUploadUrl + field value is set in the body with the value returned by generateUploadUrl Cloud + Function API method. + + :param body: Body passed to the create/update method calls. + :type body: dict + :param zip_path: path to the zip file containing source code. + :type body: dict + + """ + upload_function = None + + def __init__(self, body, zip_path): + self.body = body + self.zip_path = zip_path + + @staticmethod + def _is_present_and_empty(dictionary, field): + return field in dictionary and not dictionary[field] + + def _verify_upload_url_and_no_zip_path(self): + if self._is_present_and_empty(self.body, SOURCE_UPLOAD_URL): + if not self.zip_path: + raise AirflowException( + "Parameter '{}' is empty in the body and argument '{}' " + "is missing or empty. You need to have non empty '{}' " + "when '{}' is present and empty.". + format(SOURCE_UPLOAD_URL, ZIP_PATH, ZIP_PATH, SOURCE_UPLOAD_URL)) + + def _verify_upload_url_and_zip_path(self): + if SOURCE_UPLOAD_URL in self.body and self.zip_path: + if not self.body[SOURCE_UPLOAD_URL]: + self.upload_function = True + else: + raise AirflowException("Only one of '{}' in body or '{}' argument " + "allowed. Found both." + .format(SOURCE_UPLOAD_URL, ZIP_PATH)) + + def _verify_archive_url_and_zip_path(self): + if SOURCE_ARCHIVE_URL in self.body and self.zip_path: + raise AirflowException("Only one of '{}' in body or '{}' argument " + "allowed. Found both." + .format(SOURCE_ARCHIVE_URL, ZIP_PATH)) + + def should_upload_function(self): + if self.upload_function is None: + raise AirflowException('validate() method has to be invoked before ' + 'should_upload_function') + return self.upload_function + + def preprocess_body(self): + self._verify_archive_url_and_zip_path() + self._verify_upload_url_and_zip_path() + self._verify_upload_url_and_no_zip_path() + if self.upload_function is None: + self.upload_function = False + + +FUNCTION_NAME_PATTERN = '^projects/[^/]+/locations/[^/]+/functions/[^/]+$' +FUNCTION_NAME_COMPILED_PATTERN = re.compile(FUNCTION_NAME_PATTERN) + + +class GcfFunctionDeleteOperator(BaseOperator): + """ + Delete a function with specified name from Google Cloud Functions. + + :param name: A fully-qualified function name, matching + the pattern: `^projects/[^/]+/locations/[^/]+/functions/[^/]+$` + :type name: str + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: str + :param api_version: Version of the API used (for example v1). + :type api_version: str + """ + + @apply_defaults + def __init__(self, + name, + gcp_conn_id='google_cloud_default', + api_version='v1', + *args, **kwargs): + self.name = name + self.gcp_conn_id = gcp_conn_id + self.api_version = api_version + self._validate_inputs() + self.hook = GcfHook(gcp_conn_id=self.gcp_conn_id, api_version=self.api_version) + super(GcfFunctionDeleteOperator, self).__init__(*args, **kwargs) + + def _validate_inputs(self): + if not self.name: + raise AttributeError('Empty parameter: name') + else: + pattern = FUNCTION_NAME_COMPILED_PATTERN + if not pattern.match(self.name): + raise AttributeError( + 'Parameter name must match pattern: {}'.format(FUNCTION_NAME_PATTERN)) + + def execute(self, context): + try: + return self.hook.delete_function(self.name) + except HttpError as e: + status = e.resp.status + if status == 404: + self.log.info('The function does not exist in this project') + else: + self.log.error('An error occurred. Exiting.') + raise e diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst index efd544efed3ba..0d973a391cc7b 100644 --- a/docs/howto/operator.rst +++ b/docs/howto/operator.rst @@ -101,3 +101,123 @@ to execute a BigQuery load job. :dedent: 4 :start-after: [START howto_operator_gcs_to_bq] :end-before: [END howto_operator_gcs_to_bq] + +GcfFunctionDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the ``default_args`` dict to pass arguments to the operator. + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py + :language: python + :start-after: [START howto_operator_gcf_delete_args] + :end-before: [END howto_operator_gcf_delete_args] + + +Use the :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator` +to delete a function from Google Cloud Functions. + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py + :language: python + :start-after: [START howto_operator_gcf_delete] + :end-before: [END howto_operator_gcf_delete] + +Troubleshooting +""""""""""""""" + +In case you want to run deploy operator using a service account and get "forbidden 403" +errors, it means that your service account has not enough permissions set via IAM. + +* First you need to Assign your Service Account "Cloud Functions Developer" role +* Make sure you grant the user the IAM Service Account User role on the Cloud Functions +Runtime service account. Typical way of doing it with gcloud is shown below - just +replace PROJECT_ID with ID of your project and SERVICE_ACCOUNT_EMAIL with the email id +of your service account. + +.. code-block:: bash + + gcloud iam service-accounts add-iam-policy-binding \ + PROJECT_ID@appspot.gserviceaccount.com \ + --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \ + --role="roles/iam.serviceAccountUser" + + +See `Adding the IAM service agent user role to the runtime service `_ for details + +GcfFunctionDeployOperator +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator` +to deploy a function from Google Cloud Functions. + +The examples below use Airflow variables defined in order to show various variants and +combinations of default_args you can use. The variables are defined as follows: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy_variables] + :end-before: [END howto_operator_gcf_deploy_variables] + +With those variables one can define body of the request: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy_body] + :end-before: [END howto_operator_gcf_deploy_body] + +The default_args dictionary when you create DAG can be used to pass body and other +arguments: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy_args] + :end-before: [END howto_operator_gcf_deploy_args] + +Note that the neither the body nor default args are complete in the above examples. +Depending on the variables set there might be different variants on how to pass +source code related fields. Currently you can pass either +`sourceArchiveUrl`, `sourceRepository` or `sourceUploadUrl` as described in +`CloudFunction API specification `_. +Additionally default_args might contain `zip_path` parameter to run extra step +of uploading the source code before deploying it. In the last case you also need to +provide an empty `sourceUploadUrl` parameter in the body. + +Example logic of setting the source code related fields based on variables defined above +is shown here: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy_variants] + :end-before: [END howto_operator_gcf_deploy_variants] + +The code to create the operator: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py + :language: python + :start-after: [START howto_operator_gcf_deploy] + :end-before: [END howto_operator_gcf_deploy] + +Troubleshooting +""""""""""""""" + +In case you want to run deploy operator using a service account and get "forbidden 403" +errors, it means that your service account has not enough permissions set via IAM. + +* First you need to Assign your Service Account "Cloud Functions Developer" role +* Make sure you grant the user the IAM Service Account User role on the Cloud Functions +Runtime service account. Typical way of doing it with gcloud is shown below - just +replace PROJECT_ID with ID of your project and SERVICE_ACCOUNT_EMAIL with the email id +of your service account. + +.. code-block:: bash + + gcloud iam service-accounts add-iam-policy-binding \ + PROJECT_ID@appspot.gserviceaccount.com \ + --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \ + --role="roles/iam.serviceAccountUser" + + +See `Adding the IAM service agent user role to the runtime service `_ for details + +Also make sure that your service account has access to the source code of function +in case it should be downloaded. It might mean that you add Source Repository Viewer +role to the service account in case the source code is in Google Source Repository. diff --git a/docs/integration.rst b/docs/integration.rst index 522d4bbd446ab..c4800d65ac298 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -448,6 +448,40 @@ BigQueryHook :members: +Cloud Functions +''''''''''''''' + +Cloud Functions Operators +""""""""""""""""""""""""" + +- :ref:`GcfFunctionDeployOperator` : deploy Google Cloud Function to the cloud. +- :ref:`GcfFunctionDeleteOperator` : delete Google Cloud Function in the cloud. + +.. autoclass:: airflow.contrib.operators.gcp_operator.GCP + +.. _GcfFunctionDeployOperator: + +GcfFunctionDeployOperator +^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator + + +.. _GcfFunctionDeleteOperator: + +GcfFunctionDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator + + +Cloud Functions Hook +"""""""""""""""""""" + +.. autoclass:: airflow.contrib.hooks.gcp_function_hook.GcfHook + :members: + + Cloud DataFlow '''''''''''''' diff --git a/tests/contrib/operators/test_gcp_function_operator.py b/tests/contrib/operators/test_gcp_function_operator.py new file mode 100644 index 0000000000000..d7585ae66fdef --- /dev/null +++ b/tests/contrib/operators/test_gcp_function_operator.py @@ -0,0 +1,624 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from googleapiclient.errors import HttpError +from parameterized import parameterized + +from airflow.contrib.operators.gcp_function_operator import \ + GcfFunctionDeployOperator, GcfFunctionDeleteOperator, FUNCTION_NAME_PATTERN +from airflow import AirflowException +from airflow.version import version + +from copy import deepcopy + +try: + # noinspection PyProtectedMember + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +EMPTY_CONTENT = ''.encode('utf8') +MOCK_RESP_404 = type('', (object,), {"status": 404})() + +PROJECT_ID = 'test_project_id' +LOCATION = 'test_region' +SOURCE_ARCHIVE_URL = 'gs://folder/file.zip' +ENTRYPOINT = 'helloWorld' +FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION, + ENTRYPOINT) +RUNTIME = 'nodejs6' +VALID_RUNTIMES = ['nodejs6', 'nodejs8', 'python37'] +VALID_BODY = { + "name": FUNCTION_NAME, + "entryPoint": ENTRYPOINT, + "runtime": RUNTIME, + "httpsTrigger": {}, + "sourceArchiveUrl": SOURCE_ARCHIVE_URL +} + + +def _prepare_test_bodies(): + body_no_name = deepcopy(VALID_BODY) + body_no_name.pop('name', None) + body_empty_entry_point = deepcopy(VALID_BODY) + body_empty_entry_point['entryPoint'] = '' + body_empty_runtime = deepcopy(VALID_BODY) + body_empty_runtime['runtime'] = '' + body_values = [ + ({}, "The required parameter 'body' is missing"), + (body_no_name, "The required body field 'name' is missing"), + (body_empty_entry_point, + "The body field 'entryPoint' of value '' does not match"), + (body_empty_runtime, "The body field 'runtime' of value '' does not match"), + ] + return body_values + + +class GcfFunctionDeployTest(unittest.TestCase): + @parameterized.expand(_prepare_test_bodies()) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_body_empty_or_missing_fields(self, body, message, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertIn(message, str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_deploy_execute(self, mock_hook): + mock_hook.return_value.get_function.side_effect = mock.Mock( + side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found')) + mock_hook.return_value.create_new_function.return_value = True + op = GcfFunctionDeployOperator( + project_id=PROJECT_ID, + location=LOCATION, + body=deepcopy(VALID_BODY), + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.get_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld' + ) + expected_body = deepcopy(VALID_BODY) + expected_body['labels'] = { + 'airflow-version': 'v' + version.replace('.', '-').replace('+', '-') + } + mock_hook.return_value.create_new_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region', + expected_body + ) + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_update_function_if_exists(self, mock_hook): + mock_hook.return_value.get_function.return_value = True + mock_hook.return_value.update_function.return_value = True + op = GcfFunctionDeployOperator( + project_id=PROJECT_ID, + location=LOCATION, + body=deepcopy(VALID_BODY), + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.get_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld' + ) + expected_body = deepcopy(VALID_BODY) + expected_body['labels'] = { + 'airflow-version': 'v' + version.replace('.', '-').replace('+', '-') + } + mock_hook.return_value.update_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld', + expected_body, expected_body.keys()) + mock_hook.return_value.create_new_function.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_empty_project_id(self, mock_hook): + with self.assertRaises(AirflowException) as cm: + GcfFunctionDeployOperator( + project_id="", + location="test_region", + body=None, + task_id="id" + ) + err = cm.exception + self.assertIn("The required parameter 'project_id' is missing", str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_empty_location(self, mock_hook): + with self.assertRaises(AirflowException) as cm: + GcfFunctionDeployOperator( + project_id="test_project_id", + location="", + body=None, + task_id="id" + ) + err = cm.exception + self.assertIn("The required parameter 'location' is missing", str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_empty_body(self, mock_hook): + with self.assertRaises(AirflowException) as cm: + GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=None, + task_id="id" + ) + err = cm.exception + self.assertIn("The required parameter 'body' is missing", str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + @parameterized.expand([ + (runtime,) for runtime in VALID_RUNTIMES + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_correct_runtime_field(self, runtime, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body['runtime'] = runtime + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + (network,) for network in [ + "network-01", + "n-0-2-3-4", + "projects/PROJECT/global/networks/network-01" + "projects/PRÓJECT/global/networks/netwórk-01" + ] + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_valid_network_field(self, network, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body['network'] = network + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + (labels,) for labels in [ + {}, + {"label": 'value-01'}, + {"label_324234_a_b_c": 'value-01_93'}, + ] + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_valid_labels_field(self, labels, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body['labels'] = labels + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_validation_disabled(self, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = { + "name": "function_name", + "some_invalid_body_field": "some_invalid_body_field_value" + } + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + validate_body=False, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_body_validation_simple(self, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body['name'] = '' + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertIn("The body field 'name' of value '' does not match", + str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + ('name', '', + "The body field 'name' of value '' does not match"), + ('description', '', "The body field 'description' of value '' does not match"), + ('entryPoint', '', "The body field 'entryPoint' of value '' does not match"), + ('availableMemoryMb', '0', + "The available memory has to be greater than 0"), + ('availableMemoryMb', '-1', + "The available memory has to be greater than 0"), + ('availableMemoryMb', 'ss', + "invalid literal for int() with base 10: 'ss'"), + ('network', '', "The body field 'network' of value '' does not match"), + ('maxInstances', '0', "The max instances parameter has to be greater than 0"), + ('maxInstances', '-1', "The max instances parameter has to be greater than 0"), + ('maxInstances', 'ss', "invalid literal for int() with base 10: 'ss'"), + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_invalid_field_values(self, key, value, message, mock_hook): + mock_hook.return_value.list_functions.return_value = [] + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body[key] = value + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertIn(message, str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + ({'sourceArchiveUrl': ''}, + "The body field 'source_code.sourceArchiveUrl' of value '' does not match"), + ({'sourceArchiveUrl': '', 'zip_path': '/path/to/file'}, + "Only one of 'sourceArchiveUrl' in body or 'zip_path' argument allowed."), + ({'sourceArchiveUrl': 'gs://url', 'zip_path': '/path/to/file'}, + "Only one of 'sourceArchiveUrl' in body or 'zip_path' argument allowed."), + ({'sourceArchiveUrl': '', 'sourceUploadUrl': ''}, + "Parameter 'sourceUploadUrl' is empty in the body and argument " + "'zip_path' is missing or empty."), + ({'sourceArchiveUrl': 'gs://adasda', 'sourceRepository': ''}, + "The field 'source_code.sourceRepository' should be dictionary type"), + ({'sourceUploadUrl': '', 'sourceRepository': ''}, + "Parameter 'sourceUploadUrl' is empty in the body and argument 'zip_path' " + "is missing or empty."), + ({'sourceArchiveUrl': '', 'sourceUploadUrl': '', 'sourceRepository': ''}, + "Parameter 'sourceUploadUrl' is empty in the body and argument 'zip_path' " + "is missing or empty."), + ({'sourceArchiveUrl': 'gs://url', 'sourceUploadUrl': 'https://url'}, + "The mutually exclusive fields 'sourceUploadUrl' and 'sourceArchiveUrl' " + "belonging to the union 'source_code' are both present. Please remove one"), + ({'sourceUploadUrl': 'https://url', 'zip_path': '/path/to/file'}, + "Only one of 'sourceUploadUrl' in body " + "or 'zip_path' argument allowed. Found both."), + ({'sourceUploadUrl': ''}, "Parameter 'sourceUploadUrl' is empty in the body " + "and argument 'zip_path' is missing or empty."), + ({'sourceRepository': ''}, "The field 'source_code.sourceRepository' " + "should be dictionary type"), + ({'sourceRepository': {}}, "The required body field " + "'source_code.sourceRepository.url' is missing"), + ({'sourceRepository': {'url': ''}}, + "The body field 'source_code.sourceRepository.url' of value '' does not match"), + ] + ) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_invalid_source_code_union_field(self, source_code, message, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + body = deepcopy(VALID_BODY) + body.pop('sourceUploadUrl', None) + body.pop('sourceArchiveUrl', None) + zip_path = source_code.pop('zip_path', None) + body.update(source_code) + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id", + zip_path=zip_path + ) + op.execute(None) + err = cm.exception + self.assertIn(message, str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + ({'sourceArchiveUrl': 'gs://url'},), + ({'zip_path': '/path/to/file', 'sourceUploadUrl': None},), + ({'sourceUploadUrl': + 'https://source.developers.google.com/projects/a/repos/b/revisions/c/paths/d'},), + ({'sourceRepository': + {'url': 'https://source.developers.google.com/projects/a/' + 'repos/b/revisions/c/paths/d'}},), + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_valid_source_code_union_field(self, source_code, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + mock_hook.return_value.get_function.side_effect = mock.Mock( + side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found')) + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body.pop('sourceUploadUrl', None) + body.pop('sourceArchiveUrl', None) + body.pop('sourceRepository', None) + body.pop('sourceRepositoryUrl', None) + zip_path = source_code.pop('zip_path', None) + body.update(source_code) + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id", + zip_path=zip_path + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + if zip_path: + mock_hook.return_value.upload_function_zip.assert_called_once_with( + parent='projects/test_project_id/locations/test_region', + zip_path='/path/to/file' + ) + mock_hook.return_value.get_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld' + ) + mock_hook.return_value.create_new_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region', + body + ) + mock_hook.reset_mock() + + @parameterized.expand([ + ({'eventTrigger': {}}, + "The required body field 'trigger.eventTrigger.eventType' is missing"), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b'}}, + "The required body field 'trigger.eventTrigger.resource' is missing"), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', 'resource': ''}}, + "The body field 'trigger.eventTrigger.resource' of value '' does not match"), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res', + 'service': ''}}, + "The body field 'trigger.eventTrigger.service' of value '' does not match"), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res', + 'service': 'service_name', + 'failurePolicy': {'retry': ''}}}, + "The field 'trigger.eventTrigger.failurePolicy.retry' " + "should be dictionary type") + ] + ) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_invalid_trigger_union_field(self, trigger, message, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + body = deepcopy(VALID_BODY) + body.pop('httpsTrigger', None) + body.pop('eventTrigger', None) + body.update(trigger) + with self.assertRaises(AirflowException) as cm: + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id", + ) + op.execute(None) + err = cm.exception + self.assertIn(message, str(err)) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.reset_mock() + + @parameterized.expand([ + ({'httpsTrigger': {}},), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res'}},), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res', + 'service': 'service_name'}},), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/ą.b', + 'resource': 'reś', + 'service': 'service_namę'}},), + ({'eventTrigger': {'eventType': 'providers/test/eventTypes/a.b', + 'resource': 'res', + 'service': 'service_name', + 'failurePolicy': {'retry': {}}}},) + ]) + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_valid_trigger_union_field(self, trigger, mock_hook): + mock_hook.return_value.upload_function_zip.return_value = 'https://uploadUrl' + mock_hook.return_value.get_function.side_effect = mock.Mock( + side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found')) + mock_hook.return_value.create_new_function.return_value = True + body = deepcopy(VALID_BODY) + body.pop('httpsTrigger', None) + body.pop('eventTrigger', None) + body.update(trigger) + op = GcfFunctionDeployOperator( + project_id="test_project_id", + location="test_region", + body=body, + task_id="id", + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.get_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region/functions/helloWorld' + ) + mock_hook.return_value.create_new_function.assert_called_once_with( + 'projects/test_project_id/locations/test_region', + body + ) + mock_hook.reset_mock() + + +class GcfFunctionDeleteTest(unittest.TestCase): + _FUNCTION_NAME = 'projects/project_name/locations/project_location/functions' \ + '/function_name' + _DELETE_FUNCTION_EXPECTED = { + '@type': 'type.googleapis.com/google.cloud.functions.v1.CloudFunction', + 'name': _FUNCTION_NAME, + 'sourceArchiveUrl': 'gs://functions/hello.zip', + 'httpsTrigger': { + 'url': 'https://project_location-project_name.cloudfunctions.net' + '/function_name'}, + 'status': 'ACTIVE', 'entryPoint': 'entry_point', 'timeout': '60s', + 'availableMemoryMb': 256, + 'serviceAccountEmail': 'project_name@appspot.gserviceaccount.com', + 'updateTime': '2018-08-23T00:00:00Z', + 'versionId': '1', 'runtime': 'nodejs6'} + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_delete_execute(self, mock_hook): + mock_hook.return_value.delete_function.return_value = \ + self._DELETE_FUNCTION_EXPECTED + op = GcfFunctionDeleteOperator( + name=self._FUNCTION_NAME, + task_id="id" + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.delete_function.assert_called_once_with( + 'projects/project_name/locations/project_location/functions/function_name' + ) + self.assertEqual(result['name'], self._FUNCTION_NAME) + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_correct_name(self, mock_hook): + op = GcfFunctionDeleteOperator( + name="projects/project_name/locations/project_location/functions" + "/function_name", + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_invalid_name(self, mock_hook): + with self.assertRaises(AttributeError) as cm: + op = GcfFunctionDeleteOperator( + name="invalid_name", + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertEqual(str(err), 'Parameter name must match pattern: {}'.format( + FUNCTION_NAME_PATTERN)) + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_empty_name(self, mock_hook): + mock_hook.return_value.delete_function.return_value = \ + self._DELETE_FUNCTION_EXPECTED + with self.assertRaises(AttributeError) as cm: + GcfFunctionDeleteOperator( + name="", + task_id="id" + ) + err = cm.exception + self.assertEqual(str(err), 'Empty parameter: name') + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_gcf_error_silenced_when_function_doesnt_exist(self, mock_hook): + op = GcfFunctionDeleteOperator( + name=self._FUNCTION_NAME, + task_id="id" + ) + mock_hook.return_value.delete_function.side_effect = mock.Mock( + side_effect=HttpError(resp=MOCK_RESP_404, content=b'not found')) + op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.delete_function.assert_called_once_with( + 'projects/project_name/locations/project_location/functions/function_name' + ) + + @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook') + def test_non_404_gcf_error_bubbled_up(self, mock_hook): + op = GcfFunctionDeleteOperator( + name=self._FUNCTION_NAME, + task_id="id" + ) + resp = type('', (object,), {"status": 500})() + mock_hook.return_value.delete_function.side_effect = mock.Mock( + side_effect=HttpError(resp=resp, content=b'error')) + + with self.assertRaises(HttpError): + op.execute(None) + + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.delete_function.assert_called_once_with( + 'projects/project_name/locations/project_location/functions/function_name' + )