Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions airflow/contrib/example_dags/example_gcp_function_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG that deletes a Google Cloud Function.
This DAG relies on the following Airflow variables
https://airflow.apache.org/concepts.html#variables
* PROJECT_ID - Google Cloud Project where the Cloud Function exists.
* LOCATION - Google Cloud Functions region where the function exists.
* ENTRYPOINT - Name of the executable function in the source code.
"""

import datetime

import airflow
from airflow import models
from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator

# [START howto_operator_gcf_delete_args]
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
# A fully-qualified name of the function to delete

FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
ENTRYPOINT)
default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}
# [END howto_operator_gcf_delete_args]

with models.DAG(
'example_gcp_function_delete',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)
) as dag:
# [START howto_operator_gcf_delete]
t1 = GcfFunctionDeleteOperator(
task_id="gcf_delete_task",
name=FUNCTION_NAME
)
# [END howto_operator_gcf_delete]
113 changes: 113 additions & 0 deletions airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG that creates a Google Cloud Function and then deletes it.

This DAG relies on the following Airflow variables
https://airflow.apache.org/concepts.html#variables
* PROJECT_ID - Google Cloud Project to use for the Cloud Function.
* LOCATION - Google Cloud Functions region where the function should be
created.
* SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage
or
* SOURCE_UPLOAD_URL - Generated upload URL for the zipped source
or
* ZIP_PATH - Local path to the zipped source archive
or
* SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function is
defined in a supported Cloud Source Repository URL format
https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository
* ENTRYPOINT - Name of the executable function in the source code.
"""

import datetime

from airflow import models
from airflow.contrib.operators.gcp_function_operator \
import GcfFunctionDeployOperator, GcfFunctionDeleteOperator
from airflow.utils import dates

# [START howto_operator_gcf_deploy_variables]
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
ZIP_PATH = models.Variable.get('ZIP_PATH', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
ENTRYPOINT)
RUNTIME = 'nodejs6'
VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)

# [END howto_operator_gcf_deploy_variables]

# [START howto_operator_gcf_deploy_body]
body = {
"name": FUNCTION_NAME,
"entryPoint": ENTRYPOINT,
"runtime": RUNTIME,
"httpsTrigger": {}
}
# [END howto_operator_gcf_deploy_body]

# [START howto_operator_gcf_deploy_args]
default_args = {
'start_date': dates.days_ago(1),
'project_id': PROJECT_ID,
'location': LOCATION,
'body': body,
'validate_body': VALIDATE_BODY
}
# [END howto_operator_gcf_deploy_args]

# [START howto_operator_gcf_deploy_variants]
if SOURCE_ARCHIVE_URL:
body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
elif SOURCE_REPOSITORY:
body['sourceRepository'] = {
'url': SOURCE_REPOSITORY
}
elif ZIP_PATH:
body['sourceUploadUrl'] = ''
default_args['zip_path'] = ZIP_PATH
elif SOURCE_UPLOAD_URL:
body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
else:
raise Exception("Please provide one of the source_code parameters")
# [END howto_operator_gcf_deploy_variants]


with models.DAG(
'example_gcp_function_deploy_delete',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)
) as dag:
# [START howto_operator_gcf_deploy]
deploy_task = GcfFunctionDeployOperator(
task_id="gcf_deploy_task",
name=FUNCTION_NAME
)
# [END howto_operator_gcf_deploy]
delete_task = GcfFunctionDeleteOperator(
task_id="gcf_delete_task",
name=FUNCTION_NAME
)
deploy_task >> delete_task
195 changes: 195 additions & 0 deletions airflow/contrib/hooks/gcp_function_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import time
import requests
from googleapiclient.discovery import build

from airflow import AirflowException
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook

# Number of retries - used by googleapiclient method calls to perform retries
# For requests that are "retriable"
NUM_RETRIES = 5

# Time to sleep between active checks of the operation results
TIME_TO_SLEEP_IN_SECONDS = 1


# noinspection PyAbstractClass
class GcfHook(GoogleCloudBaseHook):
"""
Hook for Google Cloud Functions APIs.
"""
_conn = None

def __init__(self,
api_version,
gcp_conn_id='google_cloud_default',
delegate_to=None):
super(GcfHook, self).__init__(gcp_conn_id, delegate_to)
self.api_version = api_version

def get_conn(self):
"""
Retrieves connection to cloud functions.

:return: Google Cloud Function services object
:rtype: dict
"""
if not self._conn:
http_authorized = self._authorize()
self._conn = build('cloudfunctions', self.api_version,
http=http_authorized, cache_discovery=False)
return self._conn

def get_function(self, name):
"""
Returns the function with a given name.

:param name: name of the function
:type name: str
:return: a CloudFunction object representing the function
:rtype: dict
"""
return self.get_conn().projects().locations().functions().get(
name=name).execute(num_retries=NUM_RETRIES)

def list_functions(self, full_location):
"""
Lists all functions created in the location.

:param full_location: full location including project. On the form
of /projects/<PROJECT>/location/<LOCATION>
:type full_location: str
:return: array of CloudFunction objects - representing functions in the location
:rtype: [dict]
"""
list_response = self.get_conn().projects().locations().functions().list(
parent=full_location).execute(num_retries=NUM_RETRIES)
return list_response.get("functions", [])

def create_new_function(self, full_location, body):
"""
Creates new cloud function in location given with body specified.

:param full_location: full location including project. On the form
of /projects/<PROJECT>/location/<LOCATION>
:type full_location: str
:param body: body required by the cloud function insert API
:type body: dict
:return: response returned by the operation
:rtype: dict
"""
response = self.get_conn().projects().locations().functions().create(
location=full_location,
body=body
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(operation_name)

def update_function(self, name, body, update_mask):
"""
Updates cloud function according to the update mask specified.

:param name: name of the function
:type name: str
:param body: body required by the cloud function patch API
:type body: str
:param update_mask: update mask - array of fields that should be patched
:type update_mask: [str]
:return: response returned by the operation
:rtype: dict
"""
response = self.get_conn().projects().locations().functions().patch(
updateMask=",".join(update_mask),
name=name,
body=body
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(operation_name)

def upload_function_zip(self, parent, zip_path):
"""
Uploads zip file with sources.

:param parent: project and location in which signed upload URL should be generated
in the form of /projects/<PROJECT>/location/<LOCATION>
:type parent: str
:param zip_path: path of the file to upload (should point to valid .zip file)
:type zip_path: str
:return: Upload URL that was returned by generateUploadUrl method
"""
response = self.get_conn().projects().locations().functions().generateUploadUrl(
parent=parent
).execute(num_retries=NUM_RETRIES)
upload_url = response.get('uploadUrl')
with open(zip_path, 'rb') as fp:
requests.put(
url=upload_url,
data=fp.read(),
# Those two headers needs to be specified according to:
# https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/generateUploadUrl
# nopep8
headers={
'Content-type': 'application/zip',
'x-goog-content-length-range': '0,104857600',
}
)
return upload_url

def delete_function(self, name):
"""
Deletes cloud function specified by name.

:param name: name of the function
:type name: str
:return: response returned by the operation
:rtype: dict
"""
response = self.get_conn().projects().locations().functions().delete(
name=name).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(operation_name)

def _wait_for_operation_to_complete(self, operation_name):
"""
Waits for the named operation to complete - checks status of the
asynchronous call.

:param operation_name: name of the operation
:type operation_name: str
:return: response returned by the operation
:rtype: dict
:exception: AirflowException in case error is returned
"""
service = self.get_conn()
while True:
operation_response = service.operations().get(
name=operation_name,
).execute(num_retries=NUM_RETRIES)
if operation_response.get("done"):
response = operation_response.get("response")
error = operation_response.get("error")
# Note, according to documentation always either response or error is
# set when "done" == True
if error:
raise AirflowException(str(error))
return response
time.sleep(TIME_TO_SLEEP_IN_SECONDS)
Loading