From 41f9a7e746d026cce0defb8df09ff6b6fdf21286 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 28 Nov 2022 12:55:54 -0800 Subject: [PATCH 1/5] add samples, tests pending --- google/cloud/storage/transfer_manager.py | 2 +- samples/snippets/storage_transfer_manager.py | 129 +++++++++++++++++++ 2 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 samples/snippets/storage_transfer_manager.py diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 162e6465d..8bdae49ca 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -370,7 +370,7 @@ def upload_many_from_filenames( If True, blobs that already have a live version will not be overwritten. This is accomplished by setting "if_generation_match = 0" on uploads. Uploads so skipped will result in a 412 Precondition Failed response - code, which will be included in the return value but not raised + code, which will be included in the return value, but not raised as an exception regardless of the value of raise_exception. :type blob_constructor_kwargs: dict diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py new file mode 100644 index 000000000..803cb88a7 --- /dev/null +++ b/samples/snippets/storage_transfer_manager.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud.storage import Client +from google.cloud.storage import transfer_manager + +def download_all_blobs_with_transfer_manager(bucket_name, path_root=""): + """Download all of the blobs in a bucket, concurrently in a thread pool. + + The file name of each blob once downloaded is derived from the blob + name and the `path_root `parameter. For complete control of the filename + of each blob, use transfer_manager.download_many() instead. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The path on your computer to which to download all of the files. This + # string is prepended to the name of each blob and can simply be a prefix, + # not a directory. If it is a directory, be sure to include the trailing + # slash in the path. An empty string means "the current working directory". + # path_root = "" + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + blob_names = [blob.name for blob in bucket.list_blobs()] + + results = transfer_manager.download_many_to_path(bucket, blob_names, path_root=path_root) + + for name, result in zip(blob_names, results): + # The results list is either `None` or an exception for each blob in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to download {} due to exception: {}".format(name, result)) + else: + print("Downloaded {} to {}.".format(name, path_root+name)) + + +def upload_many_blobs_with_transfer_manager(bucket_name, filenames, root=""): + """Upload every file in a list to a bucket, concurrently in a thread pool. + + Each blob name is derived from the filename, not including the `root` + parameter. For complete control of the blob name for each file (and other + aspects of individual blob metadata), use transfer_manager.upload_many() + instead. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # A list (or other iterable) of filenames to upload. + # filenames = ["file_1.txt", "file_2.txt"] + + # The path on your computer that is the root of all of the files in the list + # of filenames. This string is prepended to the each filename to get the + # full path to the file. Be sure to include the trailing slash. Relative + # paths and absolute paths are both accepted. This string is *not* included + # in the name of the uploaded blob; it is only used to find the source + # files. An empty string means "the current working directory". + # root="" + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + results = transfer_manager.upload_many_from_filenames(bucket, filenames, root=root) + + for name, result in zip(filenames, results): + # The results list is either `None` or an exception for each filename in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to upload {} due to exception: {}".format(name, result)) + else: + print("uploaded {} to {}.".format(name, bucket.name)) + + +def download_blob_chunks_concurrently_with_transfer_manager(bucket_name, blob_name, local_filename, chunk_size=200 * 1024 * 1024): + """Download a single blob, in chunks, concurrently in a thread pool. + + This is intended for use with very large blobs.""" + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The name of the blob to download. + # blob_name = "your-blob.txt" + + # The filename (or path) on your computer to which to download the blob. + # local_filename = "your-file.txt" + + # The size of each chunk. The file will be divided into as many pieces as + # needed based on this chunk size. For instance, if the chunk size is + # 200 megabytes, a 1.6 gigabyte file would be downloaded in eight pieces + # concurrently. + # chunk_size = 200 * 1024 * 1024 # 200 MiB. + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + blob = bucket.blob(blob_name) + + # Open the local file in binary write mode. + local_file = open(local_filename, "wb") + + # Unlike other transfer manager functions, which handle multiple files and + # return exceptions in a list, this function will simply raise any exception + # it encounters, and has no return value. + transfer_manager.download_chunks_concurrently_to_file(blob, local_file, chunk_size=chunk_size) + + # If we've gotten this far, it must have been successful. + + number_of_chunks = -(blob.size//-chunk_size) # Ceiling division + print("Downloaded {} to {} in {} chunk(s).".format(blob_name, local_file.name, number_of_chunks)) + From 76bb4fe61f2149bafafbd21f21aa4529e6f37d9e Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 28 Nov 2022 14:07:04 -0800 Subject: [PATCH 2/5] add snippet tests --- samples/snippets/snippets_test.py | 46 ++++++++++++++++++++ samples/snippets/storage_transfer_manager.py | 4 +- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index bc126010b..972a31cfa 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -74,6 +74,8 @@ import storage_upload_from_memory import storage_upload_from_stream import storage_upload_with_kms_key +import storage_transfer_manager + KMS_KEY = os.environ["CLOUD_KMS_KEY"] @@ -613,3 +615,47 @@ def test_storage_set_client_endpoint(capsys): out, _ = capsys.readouterr() assert "client initiated with endpoint: https://storage.googleapis.com" in out + + +def test_transfer_manager_snippets(test_bucket, capsys): + BLOB_NAMES = ["test.txt", "test2.txt", "/blobs/test.txt", "blobs/nesteddir/test.txt"] + BIG_BLOB_NAME = "bigblob.txt" + ALL_NAMES = BLOB_NAMES + [BIG_BLOB_NAME] + TEST_DATA_24_BYTES = b"I am a rather big blob! " + SIZE_MULTIPLIER = 1024 + + with tempfile.TemporaryDirectory() as uploads: + # Create dirs and nested dirs + for name in BLOB_NAMES: + relpath = os.path.dirname(name) + os.makedirs("{}/{}".format(uploads, relpath), exist_ok=True) + + # Create files with nested dirs to exercise directory handling. + for name in BLOB_NAMES: + with open("{}/{}".format(uploads, name), "w") as f: + f.write(name) + # Also create one somewhat bigger file. + with open("{}/{}".format(uploads, BIG_BLOB_NAME), "wb") as f: + f.write(TEST_DATA_24_BYTES * SIZE_MULTIPLIER) + + storage_transfer_manager.upload_many_blobs_with_transfer_manager(test_bucket.name, ALL_NAMES, root="{}/".format(uploads)) + out, _ = capsys.readouterr() + + for name in ALL_NAMES: + assert "Uploaded {}".format(name) in out + + with tempfile.TemporaryDirectory() as downloads: + # First let's download the bigger file in chunks. + big_destination_path = "{}/chunkeddl.txt".format(downloads) + storage_transfer_manager.download_blob_chunks_concurrently_with_transfer_manager(test_bucket.name, BIG_BLOB_NAME, big_destination_path, chunk_size=SIZE_MULTIPLIER) + out, _ = capsys.readouterr() + + assert "Downloaded {} to {} in {} chunk(s).".format(BIG_BLOB_NAME, big_destination_path, len(TEST_DATA_24_BYTES)) in out + + # Now all the smaller files, plus the big file again because it's + # still in the bucket. + storage_transfer_manager.download_all_blobs_with_transfer_manager(test_bucket.name, path_root="{}/".format(downloads)) + out, _ = capsys.readouterr() + + for name in ALL_NAMES: + assert "Downloaded {}".format(name) in out diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py index 803cb88a7..5cce5d873 100644 --- a/samples/snippets/storage_transfer_manager.py +++ b/samples/snippets/storage_transfer_manager.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); @@ -86,7 +84,7 @@ def upload_many_blobs_with_transfer_manager(bucket_name, filenames, root=""): if isinstance(result, Exception): print("Failed to upload {} due to exception: {}".format(name, result)) else: - print("uploaded {} to {}.".format(name, bucket.name)) + print("Uploaded {} to {}.".format(name, bucket.name)) def download_blob_chunks_concurrently_with_transfer_manager(bucket_name, blob_name, local_filename, chunk_size=200 * 1024 * 1024): From d519abb1c3f0b01dd470e35533eafb6713220f7e Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 28 Nov 2022 14:10:45 -0800 Subject: [PATCH 3/5] snippet and snippets_test.py linting --- samples/snippets/snippets_test.py | 74 ++++++++++++++------ samples/snippets/storage_transfer_manager.py | 27 ++++--- 2 files changed, 69 insertions(+), 32 deletions(-) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 972a31cfa..3033bdcd3 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -70,12 +70,11 @@ import storage_set_bucket_default_kms_key import storage_set_client_endpoint import storage_set_metadata +import storage_transfer_manager import storage_upload_file import storage_upload_from_memory import storage_upload_from_stream import storage_upload_with_kms_key -import storage_transfer_manager - KMS_KEY = os.environ["CLOUD_KMS_KEY"] @@ -124,8 +123,8 @@ def test_bucket(): def test_public_bucket(): # The new projects don't allow to make a bucket available to public, so # for some tests we need to use the old main project for now. - original_value = os.environ['GOOGLE_CLOUD_PROJECT'] - os.environ['GOOGLE_CLOUD_PROJECT'] = os.environ['MAIN_GOOGLE_CLOUD_PROJECT'] + original_value = os.environ["GOOGLE_CLOUD_PROJECT"] + os.environ["GOOGLE_CLOUD_PROJECT"] = os.environ["MAIN_GOOGLE_CLOUD_PROJECT"] bucket = None while bucket is None or bucket.exists(): storage_client = storage.Client() @@ -135,7 +134,7 @@ def test_public_bucket(): yield bucket bucket.delete(force=True) # Set the value back. - os.environ['GOOGLE_CLOUD_PROJECT'] = original_value + os.environ["GOOGLE_CLOUD_PROJECT"] = original_value @pytest.fixture @@ -244,7 +243,7 @@ def test_download_byte_range(test_blob): storage_download_byte_range.download_byte_range( test_blob.bucket.name, test_blob.name, 0, 4, dest_file.name ) - assert dest_file.read() == b'Hello' + assert dest_file.read() == b"Hello" def test_download_blob(test_blob): @@ -297,7 +296,8 @@ def test_delete_blob(test_blob): def test_make_blob_public(test_public_blob): storage_make_public.make_blob_public( - test_public_blob.bucket.name, test_public_blob.name) + test_public_blob.bucket.name, test_public_blob.name + ) r = requests.get(test_public_blob.public_url) assert r.text == "Hello, is it me you're looking for?" @@ -329,7 +329,9 @@ def test_generate_upload_signed_url_v4(test_bucket, capsys): ) requests.put( - url, data=content, headers={"content-type": "application/octet-stream"}, + url, + data=content, + headers={"content-type": "application/octet-stream"}, ) bucket = storage.Client().bucket(test_bucket.name) @@ -411,16 +413,20 @@ def test_versioning(test_bucket, capsys): def test_bucket_lifecycle_management(test_bucket, capsys): - bucket = storage_enable_bucket_lifecycle_management.enable_bucket_lifecycle_management( - test_bucket + bucket = ( + storage_enable_bucket_lifecycle_management.enable_bucket_lifecycle_management( + test_bucket + ) ) out, _ = capsys.readouterr() assert "[]" in out assert "Lifecycle management is enable" in out assert len(list(bucket.lifecycle_rules)) > 0 - bucket = storage_disable_bucket_lifecycle_management.disable_bucket_lifecycle_management( - test_bucket + bucket = ( + storage_disable_bucket_lifecycle_management.disable_bucket_lifecycle_management( + test_bucket + ) ) out, _ = capsys.readouterr() assert "[]" in out @@ -476,7 +482,8 @@ def test_get_service_account(capsys): def test_download_public_file(test_public_blob): storage_make_public.make_blob_public( - test_public_blob.bucket.name, test_public_blob.name) + test_public_blob.bucket.name, test_public_blob.name + ) with tempfile.NamedTemporaryFile() as dest_file: storage_download_public_file.download_public_file( test_public_blob.bucket.name, test_public_blob.name, dest_file.name @@ -486,8 +493,10 @@ def test_download_public_file(test_public_blob): def test_define_bucket_website_configuration(test_bucket): - bucket = storage_define_bucket_website_configuration.define_bucket_website_configuration( - test_bucket.name, "index.html", "404.html" + bucket = ( + storage_define_bucket_website_configuration.define_bucket_website_configuration( + test_bucket.name, "index.html", "404.html" + ) ) website_val = {"mainPageSuffix": "index.html", "notFoundPage": "404.html"} @@ -550,7 +559,7 @@ def test_change_default_storage_class(test_bucket, capsys): ) out, _ = capsys.readouterr() assert "Default storage class for bucket" in out - assert bucket.storage_class == 'COLDLINE' + assert bucket.storage_class == "COLDLINE" def test_change_file_storage_class(test_blob, capsys): @@ -559,7 +568,7 @@ def test_change_file_storage_class(test_blob, capsys): ) out, _ = capsys.readouterr() assert f"Blob {blob.name} in bucket {blob.bucket.name}" in out - assert blob.storage_class == 'NEARLINE' + assert blob.storage_class == "NEARLINE" def test_copy_file_archived_generation(test_blob): @@ -611,14 +620,19 @@ def test_batch_request(test_bucket): def test_storage_set_client_endpoint(capsys): - storage_set_client_endpoint.set_client_endpoint('https://storage.googleapis.com') + storage_set_client_endpoint.set_client_endpoint("https://storage.googleapis.com") out, _ = capsys.readouterr() assert "client initiated with endpoint: https://storage.googleapis.com" in out def test_transfer_manager_snippets(test_bucket, capsys): - BLOB_NAMES = ["test.txt", "test2.txt", "/blobs/test.txt", "blobs/nesteddir/test.txt"] + BLOB_NAMES = [ + "test.txt", + "test2.txt", + "/blobs/test.txt", + "blobs/nesteddir/test.txt", + ] BIG_BLOB_NAME = "bigblob.txt" ALL_NAMES = BLOB_NAMES + [BIG_BLOB_NAME] TEST_DATA_24_BYTES = b"I am a rather big blob! " @@ -638,7 +652,9 @@ def test_transfer_manager_snippets(test_bucket, capsys): with open("{}/{}".format(uploads, BIG_BLOB_NAME), "wb") as f: f.write(TEST_DATA_24_BYTES * SIZE_MULTIPLIER) - storage_transfer_manager.upload_many_blobs_with_transfer_manager(test_bucket.name, ALL_NAMES, root="{}/".format(uploads)) + storage_transfer_manager.upload_many_blobs_with_transfer_manager( + test_bucket.name, ALL_NAMES, root="{}/".format(uploads) + ) out, _ = capsys.readouterr() for name in ALL_NAMES: @@ -647,14 +663,26 @@ def test_transfer_manager_snippets(test_bucket, capsys): with tempfile.TemporaryDirectory() as downloads: # First let's download the bigger file in chunks. big_destination_path = "{}/chunkeddl.txt".format(downloads) - storage_transfer_manager.download_blob_chunks_concurrently_with_transfer_manager(test_bucket.name, BIG_BLOB_NAME, big_destination_path, chunk_size=SIZE_MULTIPLIER) + storage_transfer_manager.download_blob_chunks_concurrently_with_transfer_manager( + test_bucket.name, + BIG_BLOB_NAME, + big_destination_path, + chunk_size=SIZE_MULTIPLIER, + ) out, _ = capsys.readouterr() - assert "Downloaded {} to {} in {} chunk(s).".format(BIG_BLOB_NAME, big_destination_path, len(TEST_DATA_24_BYTES)) in out + assert ( + "Downloaded {} to {} in {} chunk(s).".format( + BIG_BLOB_NAME, big_destination_path, len(TEST_DATA_24_BYTES) + ) + in out + ) # Now all the smaller files, plus the big file again because it's # still in the bucket. - storage_transfer_manager.download_all_blobs_with_transfer_manager(test_bucket.name, path_root="{}/".format(downloads)) + storage_transfer_manager.download_all_blobs_with_transfer_manager( + test_bucket.name, path_root="{}/".format(downloads) + ) out, _ = capsys.readouterr() for name in ALL_NAMES: diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py index 5cce5d873..70ec098c8 100644 --- a/samples/snippets/storage_transfer_manager.py +++ b/samples/snippets/storage_transfer_manager.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from google.cloud.storage import Client -from google.cloud.storage import transfer_manager +from google.cloud.storage import Client, transfer_manager + def download_all_blobs_with_transfer_manager(bucket_name, path_root=""): """Download all of the blobs in a bucket, concurrently in a thread pool. @@ -37,7 +37,9 @@ def download_all_blobs_with_transfer_manager(bucket_name, path_root=""): blob_names = [blob.name for blob in bucket.list_blobs()] - results = transfer_manager.download_many_to_path(bucket, blob_names, path_root=path_root) + results = transfer_manager.download_many_to_path( + bucket, blob_names, path_root=path_root + ) for name, result in zip(blob_names, results): # The results list is either `None` or an exception for each blob in @@ -46,7 +48,7 @@ def download_all_blobs_with_transfer_manager(bucket_name, path_root=""): if isinstance(result, Exception): print("Failed to download {} due to exception: {}".format(name, result)) else: - print("Downloaded {} to {}.".format(name, path_root+name)) + print("Downloaded {} to {}.".format(name, path_root + name)) def upload_many_blobs_with_transfer_manager(bucket_name, filenames, root=""): @@ -87,7 +89,9 @@ def upload_many_blobs_with_transfer_manager(bucket_name, filenames, root=""): print("Uploaded {} to {}.".format(name, bucket.name)) -def download_blob_chunks_concurrently_with_transfer_manager(bucket_name, blob_name, local_filename, chunk_size=200 * 1024 * 1024): +def download_blob_chunks_concurrently_with_transfer_manager( + bucket_name, blob_name, local_filename, chunk_size=200 * 1024 * 1024 +): """Download a single blob, in chunks, concurrently in a thread pool. This is intended for use with very large blobs.""" @@ -118,10 +122,15 @@ def download_blob_chunks_concurrently_with_transfer_manager(bucket_name, blob_na # Unlike other transfer manager functions, which handle multiple files and # return exceptions in a list, this function will simply raise any exception # it encounters, and has no return value. - transfer_manager.download_chunks_concurrently_to_file(blob, local_file, chunk_size=chunk_size) + transfer_manager.download_chunks_concurrently_to_file( + blob, local_file, chunk_size=chunk_size + ) # If we've gotten this far, it must have been successful. - number_of_chunks = -(blob.size//-chunk_size) # Ceiling division - print("Downloaded {} to {} in {} chunk(s).".format(blob_name, local_file.name, number_of_chunks)) - + number_of_chunks = -(blob.size // -chunk_size) # Ceiling division + print( + "Downloaded {} to {} in {} chunk(s).".format( + blob_name, local_file.name, number_of_chunks + ) + ) From 681ec32d84660fd347b3ee8da89aaeaf76958af6 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 1 Dec 2022 16:43:44 -0800 Subject: [PATCH 4/5] snippets; recursive directory creation; rename some params --- google/cloud/storage/transfer_manager.py | 106 +++++++++++-------- samples/snippets/snippets_test.py | 14 +-- samples/snippets/storage_transfer_manager.py | 93 ++++++++-------- tests/unit/test_transfer_manager.py | 59 +++++++++-- 4 files changed, 171 insertions(+), 101 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 8bdae49ca..dd2519229 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -16,10 +16,17 @@ import concurrent.futures +import os import tempfile +import warnings from google.api_core import exceptions +warnings.warn( + "The module `transfer_manager` is a preview feature. Functionality and API " + "may change. This warning will be removed in a future release." +) + DEFAULT_CHUNK_SIZE = 200 * 1024 * 1024 @@ -305,7 +312,7 @@ def download_range_via_tempfile(blob, start, end, download_kwargs): def upload_many_from_filenames( bucket, filenames, - root="", + source_directory="", blob_name_prefix="", skip_if_exists=False, blob_constructor_kwargs=None, @@ -321,10 +328,10 @@ def upload_many_from_filenames( The destination blobs are automatically created, with blob names based on the source filenames and the blob_name_prefix. - For example, if the `filenames` include "images/icon.jpg", `root` is - "/home/myuser/", and `blob_name_prefix` is "myfiles/", then the file at - "/home/myuser/images/icon.jpg" will be uploaded to a blob named - "myfiles/images/icon.jpg". + For example, if the `filenames` include "images/icon.jpg", + `source_directory` is "/home/myuser/", and `blob_name_prefix` is "myfiles/", + then the file at "/home/myuser/images/icon.jpg" will be uploaded to a blob + named "myfiles/images/icon.jpg". :type bucket: 'google.cloud.storage.bucket.Bucket' :param bucket: @@ -333,24 +340,24 @@ def upload_many_from_filenames( :type filenames: list(str) :param filenames: A list of filenames to be uploaded. This may include part of the path. - The full path to the file must be root + filename. The filename is - separate from the root because the filename will also determine the - name of the destination blob. + The full path to the file must be source_directory + filename. - :type root: str - :param root: - A string that will be prepended to each filename in the input list, in - order to find the source file for each blob. Unlike the filename itself, - the root string does not affect the name of the uploaded blob itself. - The root string will usually end in "/" (or "\\" depending on platform) - but is not required to do so. + :type source_directory: str + :param source_directory: + A string that will be prepended (with os.path.join()) to each filename + in the input list, in order to find the source file for each blob. + Unlike the filename itself, the source_directory does not affect the + name of the uploaded blob. - For instance, if the root string is "/tmp/img-" and a filename is + For instance, if the source_directory is "/tmp/img/" and a filename is "0001.jpg", with an empty blob_name_prefix, then the file uploaded will - be "/tmp/img-0001.jpg" and the destination blob will be "0001.jpg". + be "/tmp/img/0001.jpg" and the destination blob will be "0001.jpg". This parameter can be an empty string. + Note that this parameter allows directory traversal (e.g. "/", "../") + and is not intended for unsanitized end user input. + :type blob_name_prefix: str :param blob_name_prefix: A string that will be prepended to each filename in the input list, in @@ -358,10 +365,10 @@ def upload_many_from_filenames( itself, the prefix string does not affect the location the library will look for the source data on the local filesystem. - For instance, if the root is "/tmp/img-", the blob_name_prefix is - "myuser/mystuff-" and a filename is "0001.jpg" then the file uploaded - will be "/tmp/img-0001.jpg" and the destination blob will be - "myuser/mystuff-0001.jpg". + For instance, if the source_directory is "/tmp/img/", the + blob_name_prefix is "myuser/mystuff-" and a filename is "0001.jpg" then + the file uploaded will be "/tmp/img/0001.jpg" and the destination blob + will be "myuser/mystuff-0001.jpg". The blob_name_prefix can be blank (an empty string). @@ -426,7 +433,7 @@ def upload_many_from_filenames( file_blob_pairs = [] for filename in filenames: - path = root + filename + path = os.path.join(source_directory, filename) blob_name = blob_name_prefix + filename blob = bucket.blob(blob_name, **blob_constructor_kwargs) file_blob_pairs.append((path, blob)) @@ -444,26 +451,27 @@ def upload_many_from_filenames( def download_many_to_path( bucket, blob_names, - path_root="", + destination_directory="", blob_name_prefix="", download_kwargs=None, max_workers=None, deadline=None, + create_directories=True, raise_exception=False, ): """Download many files concurrently by their blob names. This function is a PREVIEW FEATURE: the API may change in a future version. - The destination files are automatically created, with filenames based on - the source blob_names and the path_root. + The destination files are automatically created, with paths based on the + source blob_names and the destination_directory. The destination files are not automatically deleted if their downloads fail, so please check the return value of this function for any exceptions, or enable `raise_exception=True`, and process the files accordingly. - For example, if the `blob_names` include "icon.jpg", `path_root` is - "/home/myuser/", and `blob_name_prefix` is "images/", then the blob named + For example, if the `blob_names` include "icon.jpg", `destination_directory` + is "/home/myuser/", and `blob_name_prefix` is "images/", then the blob named "images/icon.jpg" will be downloaded to a file named "/home/myuser/icon.jpg". @@ -482,26 +490,31 @@ def download_many_to_path( the blob names that need not be part of the destination path should be included in the blob_name_prefix. - :type path_root: str - :param path_root: - A string that will be prepended to each blob_name in the input list, - in order to determine the destination path for that blob. The path_root - string will usually end in "/" (or "\\" depending on platform) but is - not required to do so. For instance, if the path_root string is - "/tmp/img-" and a blob_name is "0001.jpg", with an empty - blob_name_prefix, then the source blob "0001.jpg" will be downloaded to - destination "/tmp/img-0001.jpg" . This parameter can be an empty string. + :type destination_directory: str + :param destination_directory: + A string that will be prepended (with os.path.join()) to each blob_name + in the input list, in order to determine the destination path for that + blob. + + For instance, if the destination_directory string is "/tmp/img" and a + blob_name is "0001.jpg", with an empty blob_name_prefix, then the source + blob "0001.jpg" will be downloaded to destination "/tmp/img/0001.jpg" . + + This parameter can be an empty string. + + Note that this parameter allows directory traversal (e.g. "/", "../") + and is not intended for unsanitized end user input. :type blob_name_prefix: str :param blob_name_prefix: A string that will be prepended to each blob_name in the input list, in order to determine the name of the source blob. Unlike the blob_name itself, the prefix string does not affect the destination path on the - local filesystem. For instance, if the path_root is "/tmp/img-", the - blob_name_prefix is "myuser/mystuff-" and a blob_name is "0001.jpg" then - the source blob "myuser/mystuff-0001.jpg" will be downloaded to - "/tmp/img-0001.jpg". The blob_name_prefix can be blank (an empty - string). + local filesystem. For instance, if the destination_directory is + "/tmp/img/", the blob_name_prefix is "myuser/mystuff-" and a blob_name + is "0001.jpg" then the source blob "myuser/mystuff-0001.jpg" will be + downloaded to "/tmp/img/0001.jpg". The blob_name_prefix can be blank + (an empty string). :type download_kwargs: dict :param download_kwargs: @@ -523,6 +536,12 @@ def download_many_to_path( progress and concurrent.futures.TimeoutError will be raised. This can be left as the default of None (no deadline) for most use cases. + :type create_directories: bool + :param create_directories: + If True, recursively create any directories that do not exist. For + instance, if downloading object "images/img001.png", create the + directory "images" before downloading. + :type raise_exception: bool :param raise_exception: If True, instead of adding exceptions to the list of return values, @@ -545,7 +564,10 @@ def download_many_to_path( for blob_name in blob_names: full_blob_name = blob_name_prefix + blob_name - path = path_root + blob_name + path = os.path.join(destination_directory, blob_name) + if create_directories: + directory, _ = os.path.split(path) + os.makedirs(directory, exist_ok=True) blob_file_pairs.append((bucket.blob(full_blob_name), path)) return download_many( diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 3033bdcd3..5e530ca73 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -630,7 +630,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): BLOB_NAMES = [ "test.txt", "test2.txt", - "/blobs/test.txt", + "blobs/test.txt", "blobs/nesteddir/test.txt", ] BIG_BLOB_NAME = "bigblob.txt" @@ -642,18 +642,18 @@ def test_transfer_manager_snippets(test_bucket, capsys): # Create dirs and nested dirs for name in BLOB_NAMES: relpath = os.path.dirname(name) - os.makedirs("{}/{}".format(uploads, relpath), exist_ok=True) + os.makedirs(os.path.join(uploads, relpath), exist_ok=True) # Create files with nested dirs to exercise directory handling. for name in BLOB_NAMES: - with open("{}/{}".format(uploads, name), "w") as f: + with open(os.path.join(uploads, name), "w") as f: f.write(name) # Also create one somewhat bigger file. - with open("{}/{}".format(uploads, BIG_BLOB_NAME), "wb") as f: + with open(os.path.join(uploads, BIG_BLOB_NAME), "wb") as f: f.write(TEST_DATA_24_BYTES * SIZE_MULTIPLIER) storage_transfer_manager.upload_many_blobs_with_transfer_manager( - test_bucket.name, ALL_NAMES, root="{}/".format(uploads) + test_bucket.name, ALL_NAMES, source_directory="{}/".format(uploads) ) out, _ = capsys.readouterr() @@ -662,7 +662,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): with tempfile.TemporaryDirectory() as downloads: # First let's download the bigger file in chunks. - big_destination_path = "{}/chunkeddl.txt".format(downloads) + big_destination_path = os.path.join(downloads, "chunkeddl.txt") storage_transfer_manager.download_blob_chunks_concurrently_with_transfer_manager( test_bucket.name, BIG_BLOB_NAME, @@ -681,7 +681,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): # Now all the smaller files, plus the big file again because it's # still in the bucket. storage_transfer_manager.download_all_blobs_with_transfer_manager( - test_bucket.name, path_root="{}/".format(downloads) + test_bucket.name, destination_directory=os.path.join(downloads, "") ) out, _ = capsys.readouterr() diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py index 70ec098c8..a43ff5bb3 100644 --- a/samples/snippets/storage_transfer_manager.py +++ b/samples/snippets/storage_transfer_manager.py @@ -15,78 +15,85 @@ from google.cloud.storage import Client, transfer_manager -def download_all_blobs_with_transfer_manager(bucket_name, path_root=""): - """Download all of the blobs in a bucket, concurrently in a thread pool. +def upload_many_blobs_with_transfer_manager(bucket_name, filenames, source_directory=""): + """Upload every file in a list to a bucket, concurrently in a thread pool. - The file name of each blob once downloaded is derived from the blob - name and the `path_root `parameter. For complete control of the filename - of each blob, use transfer_manager.download_many() instead. + Each blob name is derived from the filename, not including the `root` + parameter. For complete control of the blob name for each file (and other + aspects of individual blob metadata), use transfer_manager.upload_many() + instead. """ # The ID of your GCS bucket # bucket_name = "your-bucket-name" - # The path on your computer to which to download all of the files. This - # string is prepended to the name of each blob and can simply be a prefix, - # not a directory. If it is a directory, be sure to include the trailing - # slash in the path. An empty string means "the current working directory". - # path_root = "" + # A list (or other iterable) of filenames to upload. + # filenames = ["file_1.txt", "file_2.txt"] + + # The directory on your computer that is the root of all of the files in the + # list of filenames. This string is prepended (with os.path.join()) to each + # filename to get the full path to the file. Relative paths and absolute + # paths are both accepted. This string is not included in the name of the + # uploaded blob; it is only used to find the source files. An empty string + # means "the current working directory". Note that this parameter allows + # directory traversal (e.g. "/", "../") and is not intended for unsanitized + # end user input. + # source_directory="" storage_client = Client() bucket = storage_client.bucket(bucket_name) - blob_names = [blob.name for blob in bucket.list_blobs()] - - results = transfer_manager.download_many_to_path( - bucket, blob_names, path_root=path_root - ) + results = transfer_manager.upload_many_from_filenames(bucket, filenames, source_directory=source_directory) - for name, result in zip(blob_names, results): - # The results list is either `None` or an exception for each blob in + for name, result in zip(filenames, results): + # The results list is either `None` or an exception for each filename in # the input list, in order. if isinstance(result, Exception): - print("Failed to download {} due to exception: {}".format(name, result)) + print("Failed to upload {} due to exception: {}".format(name, result)) else: - print("Downloaded {} to {}.".format(name, path_root + name)) + print("Uploaded {} to {}.".format(name, bucket.name)) -def upload_many_blobs_with_transfer_manager(bucket_name, filenames, root=""): - """Upload every file in a list to a bucket, concurrently in a thread pool. +def download_all_blobs_with_transfer_manager(bucket_name, destination_directory=""): + """Download all of the blobs in a bucket, concurrently in a thread pool. - Each blob name is derived from the filename, not including the `root` - parameter. For complete control of the blob name for each file (and other - aspects of individual blob metadata), use transfer_manager.upload_many() - instead. + The filename of each blob once downloaded is derived from the blob name and + the `destination_directory `parameter. For complete control of the filename + of each blob, use transfer_manager.download_many() instead. + + Directories will be created automatically as needed, for instance to + accommodate blob names that include slashes. """ # The ID of your GCS bucket # bucket_name = "your-bucket-name" - # A list (or other iterable) of filenames to upload. - # filenames = ["file_1.txt", "file_2.txt"] - - # The path on your computer that is the root of all of the files in the list - # of filenames. This string is prepended to the each filename to get the - # full path to the file. Be sure to include the trailing slash. Relative - # paths and absolute paths are both accepted. This string is *not* included - # in the name of the uploaded blob; it is only used to find the source - # files. An empty string means "the current working directory". - # root="" + # The directory on your computer to which to download all of the files. This + # string is prepended (with os.path.join()) to the name of each blob to form + # the full path. Relative paths and absolute paths are both accepted. An + # empty string means "the current working directory". Note that this + # parameter allows accepts directory traversal ("../" etc.) and is not + # intended for unsanitized end user input. + # destination_directory = "" storage_client = Client() bucket = storage_client.bucket(bucket_name) - results = transfer_manager.upload_many_from_filenames(bucket, filenames, root=root) + blob_names = [blob.name for blob in bucket.list_blobs()] - for name, result in zip(filenames, results): - # The results list is either `None` or an exception for each filename in + results = transfer_manager.download_many_to_path( + bucket, blob_names, destination_directory=destination_directory + ) + + for name, result in zip(blob_names, results): + # The results list is either `None` or an exception for each blob in # the input list, in order. if isinstance(result, Exception): - print("Failed to upload {} due to exception: {}".format(name, result)) + print("Failed to download {} due to exception: {}".format(name, result)) else: - print("Uploaded {} to {}.".format(name, bucket.name)) + print("Downloaded {} to {}.".format(name, destination_directory + name)) def download_blob_chunks_concurrently_with_transfer_manager( @@ -105,10 +112,8 @@ def download_blob_chunks_concurrently_with_transfer_manager( # The filename (or path) on your computer to which to download the blob. # local_filename = "your-file.txt" - # The size of each chunk. The file will be divided into as many pieces as - # needed based on this chunk size. For instance, if the chunk size is - # 200 megabytes, a 1.6 gigabyte file would be downloaded in eight pieces - # concurrently. + # The size of each chunk. For instance, if the chunk size is 200 megabytes, + # a 1.6 gigabyte file would be downloaded in eight pieces concurrently. # chunk_size = 200 * 1024 * 1024 # 200 MiB. storage_client = Client() diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index b48748018..9f90337a5 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from google.cloud.storage import transfer_manager +import pytest + +with pytest.warns(UserWarning): + from google.cloud.storage import transfer_manager from google.api_core import exceptions +import os import io import tempfile import unittest @@ -246,7 +250,7 @@ def test_upload_many_from_filenames(self): DEADLINE = 10 EXPECTED_FILE_BLOB_PAIRS = [ - (ROOT + filename, mock.ANY) for filename in FILENAMES + (os.path.join(ROOT, filename), mock.ANY) for filename in FILENAMES ] with mock.patch( @@ -255,7 +259,7 @@ def test_upload_many_from_filenames(self): transfer_manager.upload_many_from_filenames( bucket, FILENAMES, - ROOT, + source_directory=ROOT, blob_name_prefix=PREFIX, skip_if_exists=True, blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, @@ -305,7 +309,7 @@ def test_upload_many_from_filenames_minimal_args(self): def test_download_many_to_path(self): bucket = mock.Mock() - BLOBNAMES = ["file_a.txt", "file_b.txt"] + BLOBNAMES = ["file_a.txt", "file_b.txt", "dir_a/file_c.txt"] PATH_ROOT = "mypath/" BLOB_NAME_PREFIX = "myprefix/" DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} @@ -313,7 +317,7 @@ def test_download_many_to_path(self): DEADLINE = 10 EXPECTED_BLOB_FILE_PAIRS = [ - (mock.ANY, PATH_ROOT + blobname) for blobname in BLOBNAMES + (mock.ANY, os.path.join(PATH_ROOT, blobname)) for blobname in BLOBNAMES ] with mock.patch( @@ -322,11 +326,12 @@ def test_download_many_to_path(self): transfer_manager.download_many_to_path( bucket, BLOBNAMES, - PATH_ROOT, + destination_directory=PATH_ROOT, blob_name_prefix=BLOB_NAME_PREFIX, download_kwargs=DOWNLOAD_KWARGS, max_workers=MAX_WORKERS, deadline=DEADLINE, + create_directories=False, raise_exception=True, ) @@ -337,5 +342,43 @@ def test_download_many_to_path(self): deadline=DEADLINE, raise_exception=True, ) - bucket.blob.assert_any_call(BLOB_NAME_PREFIX + BLOBNAMES[0]) - bucket.blob.assert_any_call(BLOB_NAME_PREFIX + BLOBNAMES[1]) + for blobname in BLOBNAMES: + bucket.blob.assert_any_call(BLOB_NAME_PREFIX + blobname) + + def test_download_many_to_path_creates_directories(self): + bucket = mock.Mock() + + with tempfile.TemporaryDirectory() as tempdir: + DIR_NAME = "dir_a/dir_b" + BLOBNAMES = [ + "file_a.txt", + "file_b.txt", + os.path.join(DIR_NAME, "file_c.txt"), + ] + + EXPECTED_BLOB_FILE_PAIRS = [ + (mock.ANY, os.path.join(tempdir, blobname)) for blobname in BLOBNAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.download_many" + ) as mock_download_many: + transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=tempdir, + create_directories=True, + raise_exception=True, + ) + + mock_download_many.assert_called_once_with( + EXPECTED_BLOB_FILE_PAIRS, + download_kwargs=None, + max_workers=None, + deadline=None, + raise_exception=True, + ) + for blobname in BLOBNAMES: + bucket.blob.assert_any_call(blobname) + + assert os.path.isdir(os.path.join(tempdir, DIR_NAME)) From dd120749394b04a5baad3b9bf23ee02f3dca8c5b Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 1 Dec 2022 17:23:59 -0800 Subject: [PATCH 5/5] Add directory upload snippet --- samples/snippets/snippets_test.py | 29 +++++++ samples/snippets/storage_transfer_manager.py | 83 ++++++++++++++++++-- 2 files changed, 104 insertions(+), 8 deletions(-) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 5e530ca73..f863e2b10 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -687,3 +687,32 @@ def test_transfer_manager_snippets(test_bucket, capsys): for name in ALL_NAMES: assert "Downloaded {}".format(name) in out + + +def test_transfer_manager_directory_upload(test_bucket, capsys): + BLOB_NAMES = [ + "dirtest/test.txt", + "dirtest/test2.txt", + "dirtest/blobs/test.txt", + "dirtest/blobs/nesteddir/test.txt", + ] + + with tempfile.TemporaryDirectory() as uploads: + # Create dirs and nested dirs + for name in BLOB_NAMES: + relpath = os.path.dirname(name) + os.makedirs(os.path.join(uploads, relpath), exist_ok=True) + + # Create files with nested dirs to exercise directory handling. + for name in BLOB_NAMES: + with open(os.path.join(uploads, name), "w") as f: + f.write(name) + + storage_transfer_manager.upload_directory_with_transfer_manager( + test_bucket.name, directory="{}/".format(uploads) + ) + out, _ = capsys.readouterr() + + assert "Found {}".format(len(BLOB_NAMES)) in out + for name in BLOB_NAMES: + assert "Uploaded {}".format(name) in out diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py index a43ff5bb3..34502711d 100644 --- a/samples/snippets/storage_transfer_manager.py +++ b/samples/snippets/storage_transfer_manager.py @@ -12,16 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from google.cloud.storage import Client, transfer_manager - -def upload_many_blobs_with_transfer_manager(bucket_name, filenames, source_directory=""): +def upload_many_blobs_with_transfer_manager( + bucket_name, filenames, source_directory="" +): """Upload every file in a list to a bucket, concurrently in a thread pool. - Each blob name is derived from the filename, not including the `root` - parameter. For complete control of the blob name for each file (and other - aspects of individual blob metadata), use transfer_manager.upload_many() - instead. + Each blob name is derived from the filename, not including the + `source_directory` parameter. For complete control of the blob name for each + file (and other aspects of individual blob metadata), use + transfer_manager.upload_many() instead. """ # The ID of your GCS bucket @@ -40,10 +40,14 @@ def upload_many_blobs_with_transfer_manager(bucket_name, filenames, source_direc # end user input. # source_directory="" + from google.cloud.storage import Client, transfer_manager + storage_client = Client() bucket = storage_client.bucket(bucket_name) - results = transfer_manager.upload_many_from_filenames(bucket, filenames, source_directory=source_directory) + results = transfer_manager.upload_many_from_filenames( + bucket, filenames, source_directory=source_directory + ) for name, result in zip(filenames, results): # The results list is either `None` or an exception for each filename in @@ -55,6 +59,65 @@ def upload_many_blobs_with_transfer_manager(bucket_name, filenames, source_direc print("Uploaded {} to {}.".format(name, bucket.name)) +def upload_directory_with_transfer_manager(bucket_name, directory): + """Upload every file in a directory, including all files in subdirectories. + + Each blob name is derived from the filename, not including the `directory` + parameter itself. For complete control of the blob name for each file (and + other aspects of individual blob metadata), use + transfer_manager.upload_many() instead. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The directory on your computer to upload. Files in the directory and its + # subdirectories will be uploaded. An empty string means "the current + # working directory". + # directory="" + + from pathlib import Path + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + # Generate a list of paths (in string form) relative to the `directory`. + # This can be done in a single list comprehension, but is expanded into + # multiple lines here for clarity. + + # First, recursively get all files in `directory` as Path objects. + directory_as_path_obj = Path(directory) + paths = directory_as_path_obj.rglob("*") + + # Filter so the list only includes files, not directories themselves. + file_paths = [path for path in paths if path.is_file()] + + # These paths are relative to the current working directory. Next, make them + # relative to `directory` + relative_paths = [path.relative_to(directory) for path in file_paths] + + # Finally, convert them all to strings. + string_paths = [str(path) for path in relative_paths] + + print("Found {} files.".format(len(string_paths))) + + # Start the upload. + results = transfer_manager.upload_many_from_filenames( + bucket, string_paths, source_directory=directory + ) + + for name, result in zip(string_paths, results): + # The results list is either `None` or an exception for each filename in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to upload {} due to exception: {}".format(name, result)) + else: + print("Uploaded {} to {}.".format(name, bucket.name)) + + def download_all_blobs_with_transfer_manager(bucket_name, destination_directory=""): """Download all of the blobs in a bucket, concurrently in a thread pool. @@ -77,6 +140,8 @@ def download_all_blobs_with_transfer_manager(bucket_name, destination_directory= # intended for unsanitized end user input. # destination_directory = "" + from google.cloud.storage import Client, transfer_manager + storage_client = Client() bucket = storage_client.bucket(bucket_name) @@ -116,6 +181,8 @@ def download_blob_chunks_concurrently_with_transfer_manager( # a 1.6 gigabyte file would be downloaded in eight pieces concurrently. # chunk_size = 200 * 1024 * 1024 # 200 MiB. + from google.cloud.storage import Client, transfer_manager + storage_client = Client() bucket = storage_client.bucket(bucket_name)