From 38d0ff7f6789f82cb691824951a01b6c0ee16342 Mon Sep 17 00:00:00 2001 From: averikitsch Date: Fri, 12 Jul 2019 10:13:55 -0700 Subject: [PATCH 1/7] Migration guide --- tasks/app.yaml | 23 +++++ tasks/main.py | 64 ++++++++++++++ tasks/migration.py | 196 +++++++++++++++++++++++++++++++++++++++++ tasks/requirements.txt | 1 + 4 files changed, 284 insertions(+) create mode 100644 tasks/app.yaml create mode 100644 tasks/main.py create mode 100644 tasks/migration.py diff --git a/tasks/app.yaml b/tasks/app.yaml new file mode 100644 index 00000000000..02a7493c77e --- /dev/null +++ b/tasks/app.yaml @@ -0,0 +1,23 @@ +# Copyright 2019 Google LLC All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +runtime: python37 +service: worker + +# [START taskqueues_use_https] +handlers: +- url: /.* + script: auto + secure: always +# [END taskqueues_use_https] diff --git a/tasks/main.py b/tasks/main.py new file mode 100644 index 00000000000..f12d14c2bfb --- /dev/null +++ b/tasks/main.py @@ -0,0 +1,64 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START taskqueues_request_handler] +from flask import Flask, request +from google.cloud import datastore + + +app = Flask(__name__) + + +@app.route('/update_counter', methods=['POST']) +def example_task_handler(): + # [START taskqueues_secure_handler] + if request.headers.get('X-Appengine-Taskname') is None: + # You may use the presence of the X-Appengine-Taskname header to validate + # the request comes from Cloud Tasks. + print('Invalid Task: No X-Appengine-Taskname request header found') + return 'Bad Request - Invalid Task', 400 + # [END taskqueues_secure_handler] + + amount = int(request.get_data()) + uodate_counter(amount) + return + + +def update_counter(amount): + # Instantiates a client + client = datastore.Client() + + key = client.key('Counter', 'count') + counter = client.get(key) + + # Create entity if it doesn't exist + if counter is None: + counter = datastore.Entity(key) + previous = 0 + else: + previous = counter['count'] + + counter.update({'count': amount + previous}) + # Send update request + client.put(counter) + + print('Counter: {}'.format(amount + previous)) + return + + +if __name__ == '__main__': + # This is used when running locally. Gunicorn is used to run the + # application on Google App Engine. See entrypoint in app.yaml. + app.run(host='127.0.0.1', port=8080, debug=True) +# [END taskqueues_request_handler] diff --git a/tasks/migration.py b/tasks/migration.py new file mode 100644 index 00000000000..7c547a15b0d --- /dev/null +++ b/tasks/migration.py @@ -0,0 +1,196 @@ + +from google.cloud import tasks +from google.protobuf import timestamp_pb2 +from google.protobuf import duration_pb2 +import json + +# Create a client. +client = tasks.CloudTasksClient() + +project = 'my-project-id' +location = 'us-central1' + +# [START taskqueues_using_yaml] +parent = client.location_path(project, location) +queue_blue = { + 'name': client.queue_path(project, location, 'queue-blue'), + 'rate_limits': { + 'max_dispatches_per_second': 5 + }, + 'app_engine_routing_override': { + 'version': 'v2', + 'service': 'task-module' + } +} + +queue_red = { + 'name': client.queue_path(project, location, 'queue-red'), + 'rate_limits': { + 'max_dispatches_per_second': 1 + } +} + +## Create Queue +queues = [queue_blue, queue_red] +for queue in queues: + response = client.create_queue(parent, queue) + print(response) +# [END taskqueues_using_yaml] + + +# [START taskqueues_processing_rate] +# Get queue object +queue_path = client.queue_path(project, location, 'queue-blue') +queue = client.get_queue(queue_path) + +# Update queue object +queue.rate_limits.max_dispatches_per_second = 20 +queue.rate_limits.max_concurrent_dispatches = 10 + +# Send update request +response = client.update_queue(queue) +print(response) +# [END taskqueues_processing_rate] + + +# [START taskqueues_new_task] +amount = '10'.encode() +queue = 'default' + +# Construct the fully qualified queue name. +parent = client.queue_path(project, location, queue) + +# Construct the request body. +task = { + 'app_engine_http_request': { + 'http_method': 'POST', + 'relative_uri': '/update_counter', + 'app_engine_routing': { + 'service': 'worker' + }, + 'body': amount + } +} + +# Use the client to build and send the task. +response = client.create_task(parent, task) + +eta = response.schedule_time.ToDatetime().strftime("%m/%d/%Y, %H:%M:%S") +print('Task {} enqueued, ETA {}.'.format(response.name, eta)) +# [END taskqueues_new_task] + +# [START taskqueues_passing_data] +task1 = { + 'app_engine_http_request': { + 'http_method': 'POST', + 'relative_uri': '/update_counter?key=blue', + 'app_engine_routing': { + 'service': 'worker' + } + } +} + +task2 = { + 'app_engine_http_request': { + 'http_method': 'POST', + 'relative_uri': '/update_counter', + 'app_engine_routing': { + 'service': 'worker' + }, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps({'key': 'blue'}).encode() + } +} + +# Use the client to build and send the task. +tasks = [task1, task2] +for task in tasks: + response = client.create_task(parent, task) + print(response) +# [END taskqueues_passing_data] + +# [START taskqueues_naming_tasks] +task = { + 'name': client.task_path(project, location, queue, 'first-try'), + 'app_engine_http_request': { + 'http_method': 'GET', + 'relative_uri': '/url/path' + } +} +response = client.create_task(parent, task) +print(response) +# [END taskqueues_naming_tasks] + +# [START taskqueues_deleting_tasks] +task_path = client.task_path(project, location, 'queue1', 'foo') +response = client.delete_task(task_path) +# [END taskqueues_deleting_tasks] + +# [START taskqueues_purging_tasks] +queue_path = client.queue_path(project, location, 'queue1') +response = client.purge_queue(queue_path) +# [END taskqueues_purging_tasks] + +# [START taskqueues_pause_queue] +queue_path = client.queue_path(project, location, 'queue1') +response = client.pause_queue(queue_path) +# [END taskqueues_pause_queues] + +# [START taskqueues_deleting_queues] +queue_path = client.queue_path(project, location, 'queue1') +response = client.delete_queue(queue_path) +# [END taskqueues_deleting_queues] + +# [START taskqueues_retrying_tasks] +from google.protobuf import duration_pb2 + +fooqueue = { + 'name': client.queue_path(project, location, 'fooqueue'), + 'rate_limits': { + 'max_dispatches_per_second': 1 + }, + 'retry_config': { + 'max_attempts': 7, + 'max_retry_duration': 2*60*60*24 + } +} + +min = duration_pb2.Duration() +min.seconds = 10 + +max = duration_pb2.Duration() +max.seconds = 200 + +barqueue = { + 'name': client.queue_path(project, location, 'barqueue'), + 'rate_limits': { + 'max_dispatches_per_second': 1 + }, + 'retry_config': { + 'min_backoff': min, + 'max_backoff': max, + 'nax_doublings': 0 + } +} + +max.seconds = 300 +bazqueue = { + 'name': client.queue_path(project, location, 'bazqueue'), + 'rate_limits': { + 'max_dispatches_per_second': 1 + }, + 'retry_config': { + 'min_backoff': min, + 'max_backoff': max, + 'nax_doublings': 3 + } +} + +## Create Queue +queues = [fooqueue, barqueue, bazqueue] +for queue in queues: + response = client.create_queue(parent, queue) + print(response) +# [END taskqueues_retrying_tasks] diff --git a/tasks/requirements.txt b/tasks/requirements.txt index 29d55c8174e..943dc7d5e9c 100644 --- a/tasks/requirements.txt +++ b/tasks/requirements.txt @@ -2,3 +2,4 @@ Flask==1.0.2 gunicorn==19.9.0 google-cloud-tasks==1.1.0 googleapis-common-protos==1.6.0 +google-cloud-datastore==1.7.3 From 5b8f5598b72859352b9302ca19992a957e8e65c6 Mon Sep 17 00:00:00 2001 From: averikitsch Date: Wed, 24 Jul 2019 11:53:35 -0700 Subject: [PATCH 2/7] remove app and update migraitonn --- tasks/app.yaml | 23 --- tasks/main.py | 64 ------- tasks/migration.py | 396 +++++++++++++++++++++++------------------ tasks/requirements.txt | 3 - 4 files changed, 223 insertions(+), 263 deletions(-) delete mode 100644 tasks/app.yaml delete mode 100644 tasks/main.py diff --git a/tasks/app.yaml b/tasks/app.yaml deleted file mode 100644 index 02a7493c77e..00000000000 --- a/tasks/app.yaml +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2019 Google LLC All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -runtime: python37 -service: worker - -# [START taskqueues_use_https] -handlers: -- url: /.* - script: auto - secure: always -# [END taskqueues_use_https] diff --git a/tasks/main.py b/tasks/main.py deleted file mode 100644 index f12d14c2bfb..00000000000 --- a/tasks/main.py +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START taskqueues_request_handler] -from flask import Flask, request -from google.cloud import datastore - - -app = Flask(__name__) - - -@app.route('/update_counter', methods=['POST']) -def example_task_handler(): - # [START taskqueues_secure_handler] - if request.headers.get('X-Appengine-Taskname') is None: - # You may use the presence of the X-Appengine-Taskname header to validate - # the request comes from Cloud Tasks. - print('Invalid Task: No X-Appengine-Taskname request header found') - return 'Bad Request - Invalid Task', 400 - # [END taskqueues_secure_handler] - - amount = int(request.get_data()) - uodate_counter(amount) - return - - -def update_counter(amount): - # Instantiates a client - client = datastore.Client() - - key = client.key('Counter', 'count') - counter = client.get(key) - - # Create entity if it doesn't exist - if counter is None: - counter = datastore.Entity(key) - previous = 0 - else: - previous = counter['count'] - - counter.update({'count': amount + previous}) - # Send update request - client.put(counter) - - print('Counter: {}'.format(amount + previous)) - return - - -if __name__ == '__main__': - # This is used when running locally. Gunicorn is used to run the - # application on Google App Engine. See entrypoint in app.yaml. - app.run(host='127.0.0.1', port=8080, debug=True) -# [END taskqueues_request_handler] diff --git a/tasks/migration.py b/tasks/migration.py index 7c547a15b0d..ce0c7bb846e 100644 --- a/tasks/migration.py +++ b/tasks/migration.py @@ -1,196 +1,246 @@ +# Copyright 2019 Google LLC All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from google.cloud import tasks -from google.protobuf import timestamp_pb2 -from google.protobuf import duration_pb2 -import json - -# Create a client. -client = tasks.CloudTasksClient() - -project = 'my-project-id' -location = 'us-central1' - -# [START taskqueues_using_yaml] -parent = client.location_path(project, location) -queue_blue = { - 'name': client.queue_path(project, location, 'queue-blue'), - 'rate_limits': { - 'max_dispatches_per_second': 5 - }, - 'app_engine_routing_override': { - 'version': 'v2', - 'service': 'task-module' - } -} -queue_red = { - 'name': client.queue_path(project, location, 'queue-red'), - 'rate_limits': { - 'max_dispatches_per_second': 1 - } -} +def create_queue(): + # [START taskqueues_using_yaml] + client = tasks.CloudTasksClient() -## Create Queue -queues = [queue_blue, queue_red] -for queue in queues: - response = client.create_queue(parent, queue) - print(response) -# [END taskqueues_using_yaml] + project = 'my-project-id' + location = 'us-central1' + parent = client.location_path(project, location) + queue_blue = { + 'name': client.queue_path(project, location, 'queue-blue'), + 'rate_limits': { + 'max_dispatches_per_second': 5 + }, + 'app_engine_routing_override': { + 'version': 'v2', + 'service': 'task-module' + } + } -# [START taskqueues_processing_rate] -# Get queue object -queue_path = client.queue_path(project, location, 'queue-blue') -queue = client.get_queue(queue_path) + queue_red = { + 'name': client.queue_path(project, location, 'queue-red'), + 'rate_limits': { + 'max_dispatches_per_second': 1 + } + } -# Update queue object -queue.rate_limits.max_dispatches_per_second = 20 -queue.rate_limits.max_concurrent_dispatches = 10 + queues = [queue_blue, queue_red] + for queue in queues: + response = client.create_queue(parent, queue) + print(response) + # [END taskqueues_using_yaml] -# Send update request -response = client.update_queue(queue) -print(response) -# [END taskqueues_processing_rate] +def update_queue(): + # [START taskqueues_processing_rate] + client = tasks.CloudTasksClient() -# [START taskqueues_new_task] -amount = '10'.encode() -queue = 'default' + project = 'my-project-id' + location = 'us-central1' -# Construct the fully qualified queue name. -parent = client.queue_path(project, location, queue) + # Get queue object + queue_path = client.queue_path(project, location, 'queue-blue') + queue = client.get_queue(queue_path) -# Construct the request body. -task = { - 'app_engine_http_request': { - 'http_method': 'POST', - 'relative_uri': '/update_counter', - 'app_engine_routing': { - 'service': 'worker' - }, - 'body': amount + # Update queue object + queue.rate_limits.max_dispatches_per_second = 20 + queue.rate_limits.max_concurrent_dispatches = 10 + + response = client.update_queue(queue) + print(response) + # [END taskqueues_processing_rate] + + +def create_task(): + # [START taskqueues_new_task] + client = tasks.CloudTasksClient() + + project = 'my-project-id' + location = 'us-central1' + queue = 'default' + amount = 10 + parent = client.queue_path(project, location, queue) + + task = { + 'app_engine_http_request': { + 'http_method': 'POST', + 'relative_uri': '/update_counter', + 'app_engine_routing': { + 'service': 'worker' + }, + 'body': str(amount).encode() + } } -} - -# Use the client to build and send the task. -response = client.create_task(parent, task) - -eta = response.schedule_time.ToDatetime().strftime("%m/%d/%Y, %H:%M:%S") -print('Task {} enqueued, ETA {}.'.format(response.name, eta)) -# [END taskqueues_new_task] - -# [START taskqueues_passing_data] -task1 = { - 'app_engine_http_request': { - 'http_method': 'POST', - 'relative_uri': '/update_counter?key=blue', - 'app_engine_routing': { - 'service': 'worker' + + response = client.create_task(parent, task) + eta = response.schedule_time.ToDatetime().strftime("%m/%d/%Y, %H:%M:%S") + print('Task {} enqueued, ETA {}.'.format(response.name, eta)) + # [END taskqueues_new_task] + +def create_tasks_with_data(): + # [START taskqueues_passing_data] + import json + client = tasks.CloudTasksClient() + + project = 'my-project-id' + location = 'us-central1' + queue = 'default' + parent = client.queue_path(project, location, queue) + + task1 = { + 'app_engine_http_request': { + 'http_method': 'POST', + 'relative_uri': '/update_counter?key=blue', + 'app_engine_routing': { + 'service': 'worker' + } } } -} - -task2 = { - 'app_engine_http_request': { - 'http_method': 'POST', - 'relative_uri': '/update_counter', - 'app_engine_routing': { - 'service': 'worker' - }, - 'headers': { - 'Content-Type': 'application/json' - }, - 'body': json.dumps({'key': 'blue'}).encode() + + task2 = { + 'app_engine_http_request': { + 'http_method': 'POST', + 'relative_uri': '/update_counter', + 'app_engine_routing': { + 'service': 'worker' + }, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps({'key': 'blue'}).encode() + } } -} -# Use the client to build and send the task. -tasks = [task1, task2] -for task in tasks: + tasks = [task1, task2] + for task in tasks: + response = client.create_task(parent, task) + print(response) + # [END taskqueues_passing_data] + + +def create_task_with_name(): + # [START taskqueues_naming_tasks] + client = tasks.CloudTasksClient() + + project = 'my-project-id' + location = 'us-central1' + queue = "default" + parent = client.queue_path(project, location, queue) + + task = { + 'name': client.task_path(project, location, queue, 'first-try'), + 'app_engine_http_request': { + 'http_method': 'GET', + 'relative_uri': '/url/path' + } + } response = client.create_task(parent, task) print(response) -# [END taskqueues_passing_data] - -# [START taskqueues_naming_tasks] -task = { - 'name': client.task_path(project, location, queue, 'first-try'), - 'app_engine_http_request': { - 'http_method': 'GET', - 'relative_uri': '/url/path' - } -} -response = client.create_task(parent, task) -print(response) -# [END taskqueues_naming_tasks] - -# [START taskqueues_deleting_tasks] -task_path = client.task_path(project, location, 'queue1', 'foo') -response = client.delete_task(task_path) -# [END taskqueues_deleting_tasks] - -# [START taskqueues_purging_tasks] -queue_path = client.queue_path(project, location, 'queue1') -response = client.purge_queue(queue_path) -# [END taskqueues_purging_tasks] - -# [START taskqueues_pause_queue] -queue_path = client.queue_path(project, location, 'queue1') -response = client.pause_queue(queue_path) -# [END taskqueues_pause_queues] - -# [START taskqueues_deleting_queues] -queue_path = client.queue_path(project, location, 'queue1') -response = client.delete_queue(queue_path) -# [END taskqueues_deleting_queues] - -# [START taskqueues_retrying_tasks] -from google.protobuf import duration_pb2 - -fooqueue = { - 'name': client.queue_path(project, location, 'fooqueue'), - 'rate_limits': { - 'max_dispatches_per_second': 1 - }, - 'retry_config': { - 'max_attempts': 7, - 'max_retry_duration': 2*60*60*24 + # [END taskqueues_naming_tasks] + + +def delete_tasks(): + # [START taskqueues_setup] + client = tasks.CloudTasksClient() + + project = 'my-project-id' + location = 'us-central1' + # [START taskqueues_setup] + + # [START taskqueues_deleting_tasks] + task_path = client.task_path(project, location, 'queue1', 'foo') + response = client.delete_task(task_path) + # [END taskqueues_deleting_tasks] + + # [START taskqueues_purging_tasks] + queue_path = client.queue_path(project, location, 'queue1') + response = client.purge_queue(queue_path) + # [END taskqueues_purging_tasks] + + # [START taskqueues_pause_queue] + queue_path = client.queue_path(project, location, 'queue1') + response = client.pause_queue(queue_path) + # [END taskqueues_pause_queues] + + # [START taskqueues_deleting_queues] + queue_path = client.queue_path(project, location, 'queue1') + response = client.delete_queue(queue_path) + # [END taskqueues_deleting_queues] + + +def retry_task(): + # [START taskqueues_retrying_tasks] + from google.protobuf import duration_pb2 + + client = tasks.CloudTasksClient() + + project = 'my-project-id' + location = 'us-central1' + parent = client.location_path(project, location) + + max_retry = duration_pb2.Duration() + max_retry.seconds = 2*60*60*24 + + fooqueue = { + 'name': client.queue_path(project, location, 'fooqueue'), + 'rate_limits': { + 'max_dispatches_per_second': 1 + }, + 'retry_config': { + 'max_attempts': 7, + 'max_retry_duration': max_retry + } } -} - -min = duration_pb2.Duration() -min.seconds = 10 - -max = duration_pb2.Duration() -max.seconds = 200 - -barqueue = { - 'name': client.queue_path(project, location, 'barqueue'), - 'rate_limits': { - 'max_dispatches_per_second': 1 - }, - 'retry_config': { - 'min_backoff': min, - 'max_backoff': max, - 'nax_doublings': 0 + + min = duration_pb2.Duration() + min.seconds = 10 + + max = duration_pb2.Duration() + max.seconds = 200 + + barqueue = { + 'name': client.queue_path(project, location, 'barqueue'), + 'rate_limits': { + 'max_dispatches_per_second': 1 + }, + 'retry_config': { + 'min_backoff': min, + 'max_backoff': max, + 'nax_doublings': 0 + } } -} - -max.seconds = 300 -bazqueue = { - 'name': client.queue_path(project, location, 'bazqueue'), - 'rate_limits': { - 'max_dispatches_per_second': 1 - }, - 'retry_config': { - 'min_backoff': min, - 'max_backoff': max, - 'nax_doublings': 3 + + max.seconds = 300 + bazqueue = { + 'name': client.queue_path(project, location, 'bazqueue'), + 'rate_limits': { + 'max_dispatches_per_second': 1 + }, + 'retry_config': { + 'min_backoff': min, + 'max_backoff': max, + 'nax_doublings': 3 + } } -} -## Create Queue -queues = [fooqueue, barqueue, bazqueue] -for queue in queues: - response = client.create_queue(parent, queue) - print(response) -# [END taskqueues_retrying_tasks] + queues = [fooqueue, barqueue, bazqueue] + for queue in queues: + response = client.create_queue(parent, queue) + print(response) + # [END taskqueues_retrying_tasks] diff --git a/tasks/requirements.txt b/tasks/requirements.txt index 943dc7d5e9c..7abf4e33bcd 100644 --- a/tasks/requirements.txt +++ b/tasks/requirements.txt @@ -1,5 +1,2 @@ -Flask==1.0.2 -gunicorn==19.9.0 google-cloud-tasks==1.1.0 googleapis-common-protos==1.6.0 -google-cloud-datastore==1.7.3 From 7dd0ac5bc9532c1ceb8e8a54261c14894653b9ba Mon Sep 17 00:00:00 2001 From: averikitsch Date: Mon, 5 Aug 2019 17:06:53 -0700 Subject: [PATCH 3/7] snippets for migration guide - tests added --- .../flexible/tasks/snippets.py | 130 ++++++++++++------ appengine/flexible/tasks/snippets_test.py | 109 +++++++++++++++ 2 files changed, 194 insertions(+), 45 deletions(-) rename tasks/migration.py => appengine/flexible/tasks/snippets.py (62%) create mode 100644 appengine/flexible/tasks/snippets_test.py diff --git a/tasks/migration.py b/appengine/flexible/tasks/snippets.py similarity index 62% rename from tasks/migration.py rename to appengine/flexible/tasks/snippets.py index ce0c7bb846e..26f705119bb 100644 --- a/tasks/migration.py +++ b/appengine/flexible/tasks/snippets.py @@ -14,16 +14,20 @@ from google.cloud import tasks -def create_queue(): +def create_queue(project, location, queue_blue_name, queue_red_name): # [START taskqueues_using_yaml] client = tasks.CloudTasksClient() - project = 'my-project-id' - location = 'us-central1' + # TODO(developer): Uncomment these lines and replace with your values. + # project = 'my-project-id' + # location = 'us- central1' + # queue_blue_name = 'queue-blue' + # queue_red_name = 'queue-red' + parent = client.location_path(project, location) queue_blue = { - 'name': client.queue_path(project, location, 'queue-blue'), + 'name': client.queue_path(project, location, queue_blue_name), 'rate_limits': { 'max_dispatches_per_second': 5 }, @@ -34,7 +38,7 @@ def create_queue(): } queue_red = { - 'name': client.queue_path(project, location, 'queue-red'), + 'name': client.queue_path(project, location, queue_red_name), 'rate_limits': { 'max_dispatches_per_second': 1 } @@ -43,19 +47,23 @@ def create_queue(): queues = [queue_blue, queue_red] for queue in queues: response = client.create_queue(parent, queue) + print("Created queue: ") print(response) # [END taskqueues_using_yaml] + return response -def update_queue(): +def update_queue(project, location, queue): # [START taskqueues_processing_rate] client = tasks.CloudTasksClient() - project = 'my-project-id' - location = 'us-central1' + # TODO(developer): Uncomment these lines and replace with your values. + # project = 'my-project-id' + # location = 'us- central1' + # queue = 'queue-blue' # Get queue object - queue_path = client.queue_path(project, location, 'queue-blue') + queue_path = client.queue_path(project, location, queue) queue = client.get_queue(queue_path) # Update queue object @@ -63,18 +71,22 @@ def update_queue(): queue.rate_limits.max_concurrent_dispatches = 10 response = client.update_queue(queue) + print("Updated queue: ") print(response) # [END taskqueues_processing_rate] + return response -def create_task(): +def create_task(project, location, queue): # [START taskqueues_new_task] client = tasks.CloudTasksClient() - project = 'my-project-id' - location = 'us-central1' - queue = 'default' + # TODO(developer): Uncomment these lines and replace with your values. + # project = 'my-project-id' + # location = 'us- central1' + # queue = 'default' amount = 10 + parent = client.queue_path(project, location, queue) task = { @@ -92,15 +104,18 @@ def create_task(): eta = response.schedule_time.ToDatetime().strftime("%m/%d/%Y, %H:%M:%S") print('Task {} enqueued, ETA {}.'.format(response.name, eta)) # [END taskqueues_new_task] + return response -def create_tasks_with_data(): +def create_tasks_with_data(project, location, queue): # [START taskqueues_passing_data] import json client = tasks.CloudTasksClient() - project = 'my-project-id' - location = 'us-central1' - queue = 'default' + # TODO(developer): Uncomment these lines and replace with your values. + # project = 'my-project-id' + # location = 'us- central1' + # queue = 'default' + parent = client.queue_path(project, location, queue) task1 = { @@ -127,24 +142,28 @@ def create_tasks_with_data(): } } - tasks = [task1, task2] - for task in tasks: - response = client.create_task(parent, task) - print(response) + response = client.create_task(parent, task1) + print(response) + response = client.create_task(parent, task2) + print(response) # [END taskqueues_passing_data] + return response -def create_task_with_name(): +def create_task_with_name(project, location, queue, task_name): # [START taskqueues_naming_tasks] client = tasks.CloudTasksClient() - project = 'my-project-id' - location = 'us-central1' - queue = "default" + # TODO(developer): Uncomment these lines and replace with your values. + # project = 'my-project-id' + # location = 'us- central1' + # queue = 'default' + # task_name = 'first-try' + parent = client.queue_path(project, location, queue) task = { - 'name': client.task_path(project, location, queue, 'first-try'), + 'name': client.task_path(project, location, queue, task_name), 'app_engine_http_request': { 'http_method': 'GET', 'relative_uri': '/url/path' @@ -153,52 +172,72 @@ def create_task_with_name(): response = client.create_task(parent, task) print(response) # [END taskqueues_naming_tasks] + return response -def delete_tasks(): +def delete_task(project, location, queue): # [START taskqueues_setup] client = tasks.CloudTasksClient() - project = 'my-project-id' - location = 'us-central1' + # TODO(developer): Uncomment these lines and replace with your values. + # project = 'my-project-id' + # location = 'us- central1' + # queue = 'queue1' # [START taskqueues_setup] # [START taskqueues_deleting_tasks] - task_path = client.task_path(project, location, 'queue1', 'foo') + task_path = client.task_path(project, location, queue, 'foo') response = client.delete_task(task_path) # [END taskqueues_deleting_tasks] # [START taskqueues_purging_tasks] - queue_path = client.queue_path(project, location, 'queue1') + queue_path = client.queue_path(project, location, queue) response = client.purge_queue(queue_path) # [END taskqueues_purging_tasks] # [START taskqueues_pause_queue] - queue_path = client.queue_path(project, location, 'queue1') + queue_path = client.queue_path(project, location, queue) response = client.pause_queue(queue_path) # [END taskqueues_pause_queues] + return response + + +def delete_queue(project, location, queue): + client = tasks.CloudTasksClient() + + # TODO(developer): Uncomment these lines and replace with your values. + # project = 'my-project-id' + # location = 'us- central1' + # queue = 'queue1' # [START taskqueues_deleting_queues] - queue_path = client.queue_path(project, location, 'queue1') + queue_path = client.queue_path(project, location, queue) response = client.delete_queue(queue_path) # [END taskqueues_deleting_queues] + return response -def retry_task(): +def retry_task(project, location, fooqueue, barqueue, bazqueue): # [START taskqueues_retrying_tasks] from google.protobuf import duration_pb2 client = tasks.CloudTasksClient() - project = 'my-project-id' - location = 'us-central1' + # TODO(developer): Uncomment these lines and replace with your values. + # project = 'my-project-id' + # location = 'us- central1' + # fooqueue = 'fooqueue' + # barqueue = 'barqueue' + # bazqueue = 'bazqueue' + + parent = client.location_path(project, location) max_retry = duration_pb2.Duration() max_retry.seconds = 2*60*60*24 - fooqueue = { - 'name': client.queue_path(project, location, 'fooqueue'), + foo = { + 'name': client.queue_path(project, location, fooqueue), 'rate_limits': { 'max_dispatches_per_second': 1 }, @@ -214,33 +253,34 @@ def retry_task(): max = duration_pb2.Duration() max.seconds = 200 - barqueue = { - 'name': client.queue_path(project, location, 'barqueue'), + bar = { + 'name': client.queue_path(project, location, barqueue), 'rate_limits': { 'max_dispatches_per_second': 1 }, 'retry_config': { 'min_backoff': min, 'max_backoff': max, - 'nax_doublings': 0 + 'max_doublings': 0 } } max.seconds = 300 - bazqueue = { - 'name': client.queue_path(project, location, 'bazqueue'), + baz = { + 'name': client.queue_path(project, location, bazqueue), 'rate_limits': { 'max_dispatches_per_second': 1 }, 'retry_config': { 'min_backoff': min, 'max_backoff': max, - 'nax_doublings': 3 + 'max_doublings': 3 } } - queues = [fooqueue, barqueue, bazqueue] + queues = [foo, bar, baz] for queue in queues: response = client.create_queue(parent, queue) print(response) # [END taskqueues_retrying_tasks] + return response diff --git a/appengine/flexible/tasks/snippets_test.py b/appengine/flexible/tasks/snippets_test.py new file mode 100644 index 00000000000..7524feb5e9a --- /dev/null +++ b/appengine/flexible/tasks/snippets_test.py @@ -0,0 +1,109 @@ +# Copyright 2019 Google LLC All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pytest +import uuid + +import snippets + +TEST_PROJECT_ID = os.getenv('GCLOUD_PROJECT') +TEST_LOCATION = os.getenv('TEST_QUEUE_LOCATION', 'us-central1') +QUEUE_NAME_1 = "queue-{}".format(uuid.uuid4()) +QUEUE_NAME_2 = "queue-{}".format(uuid.uuid4()) + +@pytest.mark.order1 +def test_create_queue(): + name = "projects/{}/locations/{}/queues/{}".format( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_2) + result = snippets.create_queue( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1, QUEUE_NAME_2) + assert name in result.name + + +@pytest.mark.order2 +def test_update_queue(): + name = "projects/{}/locations/{}/queues/{}".format( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + result = snippets.update_queue( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + assert name in result.name + + +@pytest.mark.order3 +def test_create_task(): + name = "projects/{}/locations/{}/queues/{}".format( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + result = snippets.create_task( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + assert name in result.name + + +@pytest.mark.order4 +def test_create_task_with_data(): + name = "projects/{}/locations/{}/queues/{}".format( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + result = snippets.create_tasks_with_data( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + assert name in result.name + + +@pytest.mark.order5 +def test_create_task_with_name(): + name = "projects/{}/locations/{}/queues/{}".format( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + result = snippets.create_task_with_name( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1, 'foo') + assert name in result.name + + +@pytest.mark.order6 +def test_delete_task(): + name = "projects/{}/locations/{}/queues/{}".format( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + result = snippets.delete_task( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + assert name in result.name + + +@pytest.mark.order8 +def test_delete_queue(): + name = "projects/{}/locations/{}/queues/{}".format( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + result = snippets.delete_queue( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) + assert None == result + + name = "projects/{}/locations/{}/queues/{}".format( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_2) + result = snippets.delete_queue( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_2) + assert None == result + + +@pytest.mark.order7 +def test_retry_task(): + QUEUE_NAME = [] + for i in range(3): + QUEUE_NAME.append("queue-{}".format(uuid.uuid4())) + + name = "projects/{}/locations/{}/queues/{}".format( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME[2]) + result = snippets.retry_task( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME[0], QUEUE_NAME[1], QUEUE_NAME[2]) + assert name in result.name + + for i in range(3): + snippets.delete_queue( + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME[i]) From faa3997138b1c906d3027176b2077b0ad7c91b5c Mon Sep 17 00:00:00 2001 From: averikitsch Date: Mon, 5 Aug 2019 17:24:19 -0700 Subject: [PATCH 4/7] lint --- appengine/flexible/tasks/snippets.py | 5 +++-- appengine/flexible/tasks/snippets_test.py | 12 +++++------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/appengine/flexible/tasks/snippets.py b/appengine/flexible/tasks/snippets.py index 26f705119bb..e8108a09875 100644 --- a/appengine/flexible/tasks/snippets.py +++ b/appengine/flexible/tasks/snippets.py @@ -14,6 +14,7 @@ from google.cloud import tasks + def create_queue(project, location, queue_blue_name, queue_red_name): # [START taskqueues_using_yaml] client = tasks.CloudTasksClient() @@ -106,6 +107,7 @@ def create_task(project, location, queue): # [END taskqueues_new_task] return response + def create_tasks_with_data(project, location, queue): # [START taskqueues_passing_data] import json @@ -162,7 +164,7 @@ def create_task_with_name(project, location, queue, task_name): parent = client.queue_path(project, location, queue) - task = { + task = { 'name': client.task_path(project, location, queue, task_name), 'app_engine_http_request': { 'http_method': 'GET', @@ -230,7 +232,6 @@ def retry_task(project, location, fooqueue, barqueue, bazqueue): # barqueue = 'barqueue' # bazqueue = 'bazqueue' - parent = client.location_path(project, location) max_retry = duration_pb2.Duration() diff --git a/appengine/flexible/tasks/snippets_test.py b/appengine/flexible/tasks/snippets_test.py index 7524feb5e9a..eaa192fbd30 100644 --- a/appengine/flexible/tasks/snippets_test.py +++ b/appengine/flexible/tasks/snippets_test.py @@ -23,6 +23,7 @@ QUEUE_NAME_1 = "queue-{}".format(uuid.uuid4()) QUEUE_NAME_2 = "queue-{}".format(uuid.uuid4()) + @pytest.mark.order1 def test_create_queue(): name = "projects/{}/locations/{}/queues/{}".format( @@ -79,17 +80,13 @@ def test_delete_task(): @pytest.mark.order8 def test_delete_queue(): - name = "projects/{}/locations/{}/queues/{}".format( - TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) result = snippets.delete_queue( TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) - assert None == result + assert result is None - name = "projects/{}/locations/{}/queues/{}".format( - TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_2) result = snippets.delete_queue( TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_2) - assert None == result + assert result is None @pytest.mark.order7 @@ -101,7 +98,8 @@ def test_retry_task(): name = "projects/{}/locations/{}/queues/{}".format( TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME[2]) result = snippets.retry_task( - TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME[0], QUEUE_NAME[1], QUEUE_NAME[2]) + TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME[0], QUEUE_NAME[1], + QUEUE_NAME[2]) assert name in result.name for i in range(3): From f6c90452c7e96a67a075d971a55171131e6533e0 Mon Sep 17 00:00:00 2001 From: averikitsch Date: Mon, 5 Aug 2019 17:29:07 -0700 Subject: [PATCH 5/7] remove print statements --- appengine/flexible/tasks/snippets.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/appengine/flexible/tasks/snippets.py b/appengine/flexible/tasks/snippets.py index e8108a09875..0c96f96d87d 100644 --- a/appengine/flexible/tasks/snippets.py +++ b/appengine/flexible/tasks/snippets.py @@ -48,7 +48,6 @@ def create_queue(project, location, queue_blue_name, queue_red_name): queues = [queue_blue, queue_red] for queue in queues: response = client.create_queue(parent, queue) - print("Created queue: ") print(response) # [END taskqueues_using_yaml] return response @@ -72,7 +71,6 @@ def update_queue(project, location, queue): queue.rate_limits.max_concurrent_dispatches = 10 response = client.update_queue(queue) - print("Updated queue: ") print(response) # [END taskqueues_processing_rate] return response From 0d333d33ccc84c1c5b5a8e59acd39e144e9b9df2 Mon Sep 17 00:00:00 2001 From: averikitsch Date: Tue, 6 Aug 2019 10:52:01 -0700 Subject: [PATCH 6/7] Styling changes --- appengine/flexible/tasks/snippets_test.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/appengine/flexible/tasks/snippets_test.py b/appengine/flexible/tasks/snippets_test.py index eaa192fbd30..8923c670ea6 100644 --- a/appengine/flexible/tasks/snippets_test.py +++ b/appengine/flexible/tasks/snippets_test.py @@ -78,7 +78,7 @@ def test_delete_task(): assert name in result.name -@pytest.mark.order8 +@pytest.mark.order7 def test_delete_queue(): result = snippets.delete_queue( TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME_1) @@ -89,10 +89,11 @@ def test_delete_queue(): assert result is None -@pytest.mark.order7 +@pytest.mark.order8 def test_retry_task(): + QUEUE_SIZE = 3 QUEUE_NAME = [] - for i in range(3): + for i in range(QUEUE_SIZE): QUEUE_NAME.append("queue-{}".format(uuid.uuid4())) name = "projects/{}/locations/{}/queues/{}".format( @@ -102,6 +103,8 @@ def test_retry_task(): QUEUE_NAME[2]) assert name in result.name - for i in range(3): + for i in range(QUEUE_SIZE): snippets.delete_queue( - TEST_PROJECT_ID, TEST_LOCATION, QUEUE_NAME[i]) + project=TEST_PROJECT_ID, + location=TEST_LOCATION, + queue=QUEUE_NAME[i]) From 4cd44044332efc08d6616e6b3b0df93e5085b7ca Mon Sep 17 00:00:00 2001 From: averikitsch Date: Tue, 6 Aug 2019 11:05:04 -0700 Subject: [PATCH 7/7] Travis trigger