From db868331b1d28fa320e0064fe15581e5d9553e1a Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 10 Aug 2022 11:01:14 -0700 Subject: [PATCH 01/19] checkpoint before design doc impl --- google/cloud/storage/blob.py | 51 ++++++++++++++++ google/cloud/storage/bucket.py | 44 ++++++++++++++ tests/unit/test_blob.py | 34 +++++++++++ tests/unit/test_bucket.py | 104 +++++++++++++++++++++++++++++++++ 4 files changed, 233 insertions(+) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 6f4952f44..03bf19d01 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -19,12 +19,14 @@ import base64 import cgi +import concurrent import copy import hashlib from io import BytesIO from io import TextIOWrapper import logging import mimetypes +import math import os import re from urllib.parse import parse_qsl @@ -3871,6 +3873,55 @@ def open( "Supported modes strings are 'r', 'rb', 'rt', 'w', 'wb', and 'wt' only." ) + # TODO FIXME: should this be on the client? + def download_chunks_concurrently_to_file( + self, + file_obj, + chunk_size=200*1024*1024, + max_workers=8, + **download_kwargs + ): + # We must know the size of the object, and set the generation. + if not self.size or not self.generation: + self.reload() + + chunks = math.ceil(chunk_size / self.size) + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit()] # TODO + + @staticmethod + def upload_many( + file_blob_pairs, + skip_if_exists=False, + upload_kwargs=None, + max_workers=8 + ): + # If source_path is a string, iteration may produce unexpected results. + + # TODO: support file handlers too + + if skip_if_exists: + upload_kwargs["if_not_generation_match"] = 0 + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(blob.upload_from_filename, path, **upload_kwargs) for path, blob in file_blob_pairs] + for future in concurrent.futures.as_completed(futures): + pass # TODO handle exception inside call! + + @staticmethod + def download_many( + blob_file_pairs, + download_kwargs=None, + num_workers=8 + ): + # TODO: support file handlers too? + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(blob.download_to_filename, path, **download_kwargs) for blob, path in blob_file_pairs] + for future in concurrent.futures.as_completed(futures): + pass # TODO + cache_control = _scalar_property("cacheControl") """HTTP 'Cache-Control' header for this object. diff --git a/google/cloud/storage/bucket.py b/google/cloud/storage/bucket.py index a2783fb74..ddb100b4e 100644 --- a/google/cloud/storage/bucket.py +++ b/google/cloud/storage/bucket.py @@ -3334,6 +3334,50 @@ def generate_signed_url( query_parameters=query_parameters, ) + def upload_many_from_filenames( + self, + filenames, + root, + prefix="", + skip_if_exists=False, + blob_constructor_kwargs=None, + upload_kwargs=None, + max_workers=8 + ): + file_blob_pairs = [] + + for filename in filenames: + path = root + filename + blob_name = prefix + filename + blob = self.blob(blob_name, **blob_constructor_kwargs) + file_blob_pairs.append((path, blob)) + + return Blob.upload_many( + file_blob_pairs, + skip_if_exists=skip_if_exists, + upload_kwargs=upload_kwargs, + max_workers=max_workers + ) + + def download_many_to_path( + self, + blobs, + path_root, + strip_prefix="", + download_kwargs=None, + max_workers=8 + ): + blob_file_pairs = [] + + for blob in blobs: + if not blob.name.startswith(strip_prefix): + raise ValueError(f"Blob name {blob.name} does not start with strip_prefix {strip_prefix}.") + stripped_blob_name = blob.name[len(strip_prefix):] + path = path_root + stripped_blob_name + blob_file_pairs.append((blob, path)) + + return Blob.download_many(blob_file_pairs, download_kwargs=download_kwargs, max_workers=max_workers) + def _raise_if_len_differs(expected_len, **generation_match_args): """ diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 638db9f4e..8811b00e1 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -5781,6 +5781,40 @@ def test_open(self): with self.assertRaises(ValueError): blob.open("w", ignore_flush=False) + def test_upload_many(self): + from google.cloud.storage.blob import Blob + + client = self._make_client() + bucket = _Bucket(client) + FILE_BLOB_PAIRS = [ + ("file_a.txt", self._make_one("resources/file_a", bucket=bucket)), + ("file_b.txt", self._make_one("resources/file_b", bucket=bucket)) + ] + FAKE_CONTENT_TYPE = "text/fake" + UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} + EXPECTED_UPLOAD_KWARGS = {"if_not_generation_match": 0, **UPLOAD_KWARGS} + + with mock.patch.object(Blob, "upload_from_filename", autospec=True) as upload_mock: + Blob.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + max_workers=7) + raise Exception(str(upload_mock.call_args_list)) # TODO handle exception inside call! + self.assertEqual(upload_mock.call_count, 2) + for (file, blob) in FILE_BLOB_PAIRS: + upload_mock.assert_any_call(blob, file, **EXPECTED_UPLOAD_KWARGS) + + # def upload_many( + # file_blob_pairs, + # skip_if_exists=False, + # upload_kwargs=None, + # max_workers=8 + # ): + + def test_download_many(self): + pass + class Test__quote(unittest.TestCase): @staticmethod diff --git a/tests/unit/test_bucket.py b/tests/unit/test_bucket.py index 8f4daeb1d..fbb227d1f 100644 --- a/tests/unit/test_bucket.py +++ b/tests/unit/test_bucket.py @@ -4224,6 +4224,110 @@ def test_generate_signed_url_v4_w_bucket_bound_hostname_w_scheme(self): def test_generate_signed_url_v4_w_bucket_bound_hostname_w_bare_hostname(self): self._generate_signed_url_v4_helper(bucket_bound_hostname="cdn.example.com") + def test_upload_many_by_filenames(self): + from google.cloud.storage.blob import Blob + + bucket = self._make_one() + FAKE_ROOT = "fake_assets/" + FAKE_FILENAMES = ["fake_a.txt", "fake_b.txt"] + FAKE_PREFIX = "my_assets/" + CHUNK_SIZE = 1024 * 1024 + BLOB_CONSTRUCTOR_KWARGS = {"chunk_size": CHUNK_SIZE} + FAKE_CONTENT_TYPE = "text/fake" + UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} + MAX_WORKERS = 99 + + with mock.patch.object(Blob, "upload_many", autospec=True) as many_mock: + DESIRED_RETURN_VALUE = "return_string" + many_mock.return_value = DESIRED_RETURN_VALUE + result = bucket.upload_many_from_filenames( + FAKE_FILENAMES, + FAKE_ROOT, + prefix=FAKE_PREFIX, + skip_if_exists=True, + blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, + upload_kwargs=UPLOAD_KWARGS, + max_workers=MAX_WORKERS) + + self.assertEqual(result, DESIRED_RETURN_VALUE) + + many_mock.assert_called_once_with( + mock.ANY, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + max_workers=99 + ) + pairs = many_mock.call_args[0][0] + for index, (path, blob) in enumerate(pairs): + expected_path = FAKE_ROOT + FAKE_FILENAMES[index] + expected_blob_name = FAKE_PREFIX + FAKE_FILENAMES[index] + self.assertEqual(path, expected_path) + self.assertEqual(blob.bucket, bucket) + self.assertEqual(blob.name, expected_blob_name) + self.assertEqual(blob.chunk_size, CHUNK_SIZE) + + def test_download_many_to_path(self): + from google.cloud.storage.blob import Blob + + bucket = self._make_one() + FAKE_FILENAMES = ["fake_a.txt", "fake_b.txt"] + FAKE_PREFIX = "my_assets/" + BLOBS = [ + bucket.blob(FAKE_PREFIX + FAKE_FILENAMES[0]), + bucket.blob(FAKE_PREFIX + FAKE_FILENAMES[1]) + ] + FAKE_ROOT = "fake_assets/" + DOWNLOAD_KWARGS = {"raw_download": True} + MAX_WORKERS = 99 + + with mock.patch.object(Blob, "download_many", autospec=True) as many_mock: + DESIRED_RETURN_VALUE = "return_string" + many_mock.return_value = DESIRED_RETURN_VALUE + result = bucket.download_many_to_path( + BLOBS, + FAKE_ROOT, + strip_prefix=FAKE_PREFIX, + download_kwargs=DOWNLOAD_KWARGS, + max_workers=MAX_WORKERS) + + self.assertEqual(result, DESIRED_RETURN_VALUE) + + many_mock.assert_called_once_with( + mock.ANY, + download_kwargs=DOWNLOAD_KWARGS, + max_workers=99 + ) + pairs = many_mock.call_args[0][0] + for index, (blob, path) in enumerate(pairs): + expected_path = FAKE_ROOT + FAKE_FILENAMES[index] + expected_blob_name = FAKE_PREFIX + FAKE_FILENAMES[index] + self.assertEqual(path, expected_path) + self.assertEqual(blob.bucket, bucket) + self.assertEqual(blob.name, expected_blob_name) + + + def test_download_many_to_path_strip_prefix_error(self): + from google.cloud.storage.blob import Blob + + bucket = self._make_one() + FAKE_FILENAMES = ["fake_a.txt", "fake_b.txt"] + FAKE_PREFIX = "my_assets/" + WRONG_PREFIX = "their_assets/" + BLOBS = [ + bucket.blob(FAKE_PREFIX + FAKE_FILENAMES[0]), + bucket.blob(WRONG_PREFIX + FAKE_FILENAMES[1]) + ] + FAKE_ROOT = "fake_assets/" + + with mock.patch.object(Blob, "download_many", autospec=True) as many_mock: + DESIRED_RETURN_VALUE = "return_string" + many_mock.return_value = DESIRED_RETURN_VALUE + with self.assertRaises(ValueError): + bucket.download_many_to_path( + BLOBS, + FAKE_ROOT, + strip_prefix=FAKE_PREFIX) + class Test__item_to_notification(unittest.TestCase): def _call_fut(self, iterator, item): From ff97a51e330af615203e0b4037a98393408bbceb Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 17 Aug 2022 20:38:46 -0700 Subject: [PATCH 02/19] checkpoint --- google/cloud/storage/blob.py | 49 ------ google/cloud/storage/bucket.py | 44 ----- google/cloud/storage/transfer_manager.py | 174 ++++++++++++++++++++ tests/unit/test_blob.py | 34 ---- tests/unit/test_bucket.py | 104 ------------ tests/unit/test_transfer_manager.py | 194 +++++++++++++++++++++++ 6 files changed, 368 insertions(+), 231 deletions(-) create mode 100644 google/cloud/storage/transfer_manager.py create mode 100644 tests/unit/test_transfer_manager.py diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 03bf19d01..7b2df376d 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -3873,55 +3873,6 @@ def open( "Supported modes strings are 'r', 'rb', 'rt', 'w', 'wb', and 'wt' only." ) - # TODO FIXME: should this be on the client? - def download_chunks_concurrently_to_file( - self, - file_obj, - chunk_size=200*1024*1024, - max_workers=8, - **download_kwargs - ): - # We must know the size of the object, and set the generation. - if not self.size or not self.generation: - self.reload() - - chunks = math.ceil(chunk_size / self.size) - - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit()] # TODO - - @staticmethod - def upload_many( - file_blob_pairs, - skip_if_exists=False, - upload_kwargs=None, - max_workers=8 - ): - # If source_path is a string, iteration may produce unexpected results. - - # TODO: support file handlers too - - if skip_if_exists: - upload_kwargs["if_not_generation_match"] = 0 - - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(blob.upload_from_filename, path, **upload_kwargs) for path, blob in file_blob_pairs] - for future in concurrent.futures.as_completed(futures): - pass # TODO handle exception inside call! - - @staticmethod - def download_many( - blob_file_pairs, - download_kwargs=None, - num_workers=8 - ): - # TODO: support file handlers too? - - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(blob.download_to_filename, path, **download_kwargs) for blob, path in blob_file_pairs] - for future in concurrent.futures.as_completed(futures): - pass # TODO - cache_control = _scalar_property("cacheControl") """HTTP 'Cache-Control' header for this object. diff --git a/google/cloud/storage/bucket.py b/google/cloud/storage/bucket.py index ddb100b4e..a2783fb74 100644 --- a/google/cloud/storage/bucket.py +++ b/google/cloud/storage/bucket.py @@ -3334,50 +3334,6 @@ def generate_signed_url( query_parameters=query_parameters, ) - def upload_many_from_filenames( - self, - filenames, - root, - prefix="", - skip_if_exists=False, - blob_constructor_kwargs=None, - upload_kwargs=None, - max_workers=8 - ): - file_blob_pairs = [] - - for filename in filenames: - path = root + filename - blob_name = prefix + filename - blob = self.blob(blob_name, **blob_constructor_kwargs) - file_blob_pairs.append((path, blob)) - - return Blob.upload_many( - file_blob_pairs, - skip_if_exists=skip_if_exists, - upload_kwargs=upload_kwargs, - max_workers=max_workers - ) - - def download_many_to_path( - self, - blobs, - path_root, - strip_prefix="", - download_kwargs=None, - max_workers=8 - ): - blob_file_pairs = [] - - for blob in blobs: - if not blob.name.startswith(strip_prefix): - raise ValueError(f"Blob name {blob.name} does not start with strip_prefix {strip_prefix}.") - stripped_blob_name = blob.name[len(strip_prefix):] - path = path_root + stripped_blob_name - blob_file_pairs.append((blob, path)) - - return Blob.download_many(blob_file_pairs, download_kwargs=download_kwargs, max_workers=max_workers) - def _raise_if_len_differs(expected_len, **generation_match_args): """ diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py new file mode 100644 index 000000000..235184d2a --- /dev/null +++ b/google/cloud/storage/transfer_manager.py @@ -0,0 +1,174 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud.storage.blob import Blob +from google.cloud.storage.bucket import Bucket + +import concurrent.futures + +import tempfile + + +def upload_many( + file_blob_pairs, + skip_if_exists=False, + upload_kwargs=None, + max_workers=None, + deadline=None, + raise_exception=False +): + if upload_kwargs is None: + upload_kwargs = {} + if skip_if_exists: + upload_kwargs["if_not_generation_match"] = 0 + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + for path_or_file, blob in file_blob_pairs: + method = blob.upload_from_filename if isinstance(path_or_file, str) else blob.upload_from_file + futures.append(executor.submit(method, path_or_file, **upload_kwargs)) + results = [] + concurrent.futures.wait( + futures, + timeout=deadline, + return_when=concurrent.futures.ALL_COMPLETED) + for future in futures: + if not raise_exception: + exp = future.exception() + if exp: + results.append(exp) + continue + results.append(future.result()) + return results + + +def download_many( + blob_file_pairs, + download_kwargs=None, + max_workers=None, + deadline=None, + raise_exception=False +): + if download_kwargs is None: + download_kwargs = {} + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + for blob, path_or_file in blob_file_pairs: + method = blob.download_to_filename if isinstance(path_or_file, str) else blob.download_to_file + futures.append(executor.submit(method, path_or_file, **download_kwargs)) + results = [] + concurrent.futures.wait(futures, timeout=deadline, + return_when=concurrent.futures.ALL_COMPLETED) + for future in futures: + if not raise_exception: + exp = future.exception() + if exp: + results.append(exp) + continue + results.append(future.result()) + return results + + +def download_chunks_concurrently_to_file( + blob, + file_obj, + chunk_size=200*1024*1024, + max_workers=None, + download_kwargs=None, + deadline=None +): + # We must know the size of the object, and the generation. + if not blob.size or not blob.generation: + blob.reload() + + chunks = math.ceil(chunk_size / blob.size) + + def download_range_via_tempfile(blob, file_obj, start, end, download_kwargs): + tmp = tempfile.TemporaryFile() + blob.download_to_file(tmp, start=start, end=end, **download_kwargs) + return tmp + + futures = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + cursor = 0 + while cursor < blob.size: + start = cursor + cursor = min(cursor+chunk_size, blob.size) + futures.append( + executor.submit(download_range_via_tempfile, blob, file_obj, start=start, end=cursor-1, download_kwargs=download_kwargs)) + + # Wait until all futures are done and process them in order. + concurrent.futures.wait(timeout=deadline, + return_when=concurrent.futures.ALL_COMPLETED) + for future in futures: + tmp = future.result() + tmp.seek(0) + file_obj.write(tmp.read()) + + +def upload_many_from_filenames( + bucket, + filenames, + root, + prefix="", + skip_if_exists=False, + blob_constructor_kwargs=None, + upload_kwargs=None, + max_workers=None, + deadline=None, + raise_exception=False +): + file_blob_pairs = [] + + for filename in filenames: + path = root + filename + blob_name = prefix + filename + blob = bucket.blob(blob_name, **blob_constructor_kwargs) + file_blob_pairs.append((path, blob)) + + return upload_many( + file_blob_pairs, + skip_if_exists=skip_if_exists, + upload_kwargs=upload_kwargs, + max_workers=max_workers, + deadline=deadline, + raise_exception=False + ) + + +def download_many_to_path( + bucket, + blob_names, + path_root, + blob_name_prefix="", + download_kwargs=None, + max_workers=None, + deadline=None, + raise_exception=False +): + blob_file_pairs = [] + + for blob_name in blob_names: + full_blob_name = blob_name_prefix + blob_name + path = path_root + blob_name + blob_file_pairs.append((bucket.blob(full_blob_name), path)) + + return download_many( + blob_file_pairs, + download_kwargs=download_kwargs, + max_workers=max_workers, + deadline=deadline, + raise_exception=False + ) diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 8811b00e1..638db9f4e 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -5781,40 +5781,6 @@ def test_open(self): with self.assertRaises(ValueError): blob.open("w", ignore_flush=False) - def test_upload_many(self): - from google.cloud.storage.blob import Blob - - client = self._make_client() - bucket = _Bucket(client) - FILE_BLOB_PAIRS = [ - ("file_a.txt", self._make_one("resources/file_a", bucket=bucket)), - ("file_b.txt", self._make_one("resources/file_b", bucket=bucket)) - ] - FAKE_CONTENT_TYPE = "text/fake" - UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} - EXPECTED_UPLOAD_KWARGS = {"if_not_generation_match": 0, **UPLOAD_KWARGS} - - with mock.patch.object(Blob, "upload_from_filename", autospec=True) as upload_mock: - Blob.upload_many( - FILE_BLOB_PAIRS, - skip_if_exists=True, - upload_kwargs=UPLOAD_KWARGS, - max_workers=7) - raise Exception(str(upload_mock.call_args_list)) # TODO handle exception inside call! - self.assertEqual(upload_mock.call_count, 2) - for (file, blob) in FILE_BLOB_PAIRS: - upload_mock.assert_any_call(blob, file, **EXPECTED_UPLOAD_KWARGS) - - # def upload_many( - # file_blob_pairs, - # skip_if_exists=False, - # upload_kwargs=None, - # max_workers=8 - # ): - - def test_download_many(self): - pass - class Test__quote(unittest.TestCase): @staticmethod diff --git a/tests/unit/test_bucket.py b/tests/unit/test_bucket.py index fbb227d1f..8f4daeb1d 100644 --- a/tests/unit/test_bucket.py +++ b/tests/unit/test_bucket.py @@ -4224,110 +4224,6 @@ def test_generate_signed_url_v4_w_bucket_bound_hostname_w_scheme(self): def test_generate_signed_url_v4_w_bucket_bound_hostname_w_bare_hostname(self): self._generate_signed_url_v4_helper(bucket_bound_hostname="cdn.example.com") - def test_upload_many_by_filenames(self): - from google.cloud.storage.blob import Blob - - bucket = self._make_one() - FAKE_ROOT = "fake_assets/" - FAKE_FILENAMES = ["fake_a.txt", "fake_b.txt"] - FAKE_PREFIX = "my_assets/" - CHUNK_SIZE = 1024 * 1024 - BLOB_CONSTRUCTOR_KWARGS = {"chunk_size": CHUNK_SIZE} - FAKE_CONTENT_TYPE = "text/fake" - UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} - MAX_WORKERS = 99 - - with mock.patch.object(Blob, "upload_many", autospec=True) as many_mock: - DESIRED_RETURN_VALUE = "return_string" - many_mock.return_value = DESIRED_RETURN_VALUE - result = bucket.upload_many_from_filenames( - FAKE_FILENAMES, - FAKE_ROOT, - prefix=FAKE_PREFIX, - skip_if_exists=True, - blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, - upload_kwargs=UPLOAD_KWARGS, - max_workers=MAX_WORKERS) - - self.assertEqual(result, DESIRED_RETURN_VALUE) - - many_mock.assert_called_once_with( - mock.ANY, - skip_if_exists=True, - upload_kwargs=UPLOAD_KWARGS, - max_workers=99 - ) - pairs = many_mock.call_args[0][0] - for index, (path, blob) in enumerate(pairs): - expected_path = FAKE_ROOT + FAKE_FILENAMES[index] - expected_blob_name = FAKE_PREFIX + FAKE_FILENAMES[index] - self.assertEqual(path, expected_path) - self.assertEqual(blob.bucket, bucket) - self.assertEqual(blob.name, expected_blob_name) - self.assertEqual(blob.chunk_size, CHUNK_SIZE) - - def test_download_many_to_path(self): - from google.cloud.storage.blob import Blob - - bucket = self._make_one() - FAKE_FILENAMES = ["fake_a.txt", "fake_b.txt"] - FAKE_PREFIX = "my_assets/" - BLOBS = [ - bucket.blob(FAKE_PREFIX + FAKE_FILENAMES[0]), - bucket.blob(FAKE_PREFIX + FAKE_FILENAMES[1]) - ] - FAKE_ROOT = "fake_assets/" - DOWNLOAD_KWARGS = {"raw_download": True} - MAX_WORKERS = 99 - - with mock.patch.object(Blob, "download_many", autospec=True) as many_mock: - DESIRED_RETURN_VALUE = "return_string" - many_mock.return_value = DESIRED_RETURN_VALUE - result = bucket.download_many_to_path( - BLOBS, - FAKE_ROOT, - strip_prefix=FAKE_PREFIX, - download_kwargs=DOWNLOAD_KWARGS, - max_workers=MAX_WORKERS) - - self.assertEqual(result, DESIRED_RETURN_VALUE) - - many_mock.assert_called_once_with( - mock.ANY, - download_kwargs=DOWNLOAD_KWARGS, - max_workers=99 - ) - pairs = many_mock.call_args[0][0] - for index, (blob, path) in enumerate(pairs): - expected_path = FAKE_ROOT + FAKE_FILENAMES[index] - expected_blob_name = FAKE_PREFIX + FAKE_FILENAMES[index] - self.assertEqual(path, expected_path) - self.assertEqual(blob.bucket, bucket) - self.assertEqual(blob.name, expected_blob_name) - - - def test_download_many_to_path_strip_prefix_error(self): - from google.cloud.storage.blob import Blob - - bucket = self._make_one() - FAKE_FILENAMES = ["fake_a.txt", "fake_b.txt"] - FAKE_PREFIX = "my_assets/" - WRONG_PREFIX = "their_assets/" - BLOBS = [ - bucket.blob(FAKE_PREFIX + FAKE_FILENAMES[0]), - bucket.blob(WRONG_PREFIX + FAKE_FILENAMES[1]) - ] - FAKE_ROOT = "fake_assets/" - - with mock.patch.object(Blob, "download_many", autospec=True) as many_mock: - DESIRED_RETURN_VALUE = "return_string" - many_mock.return_value = DESIRED_RETURN_VALUE - with self.assertRaises(ValueError): - bucket.download_many_to_path( - BLOBS, - FAKE_ROOT, - strip_prefix=FAKE_PREFIX) - class Test__item_to_notification(unittest.TestCase): def _call_fut(self, iterator, item): diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py new file mode 100644 index 000000000..c53709bc5 --- /dev/null +++ b/tests/unit/test_transfer_manager.py @@ -0,0 +1,194 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud.storage import transfer_manager + +import tempfile +import unittest +import mock + +class Test_Transfer_Manager(unittest.TestCase): + @staticmethod + def _make_client(*args, **kw): + from google.cloud.storage.client import Client + + return mock.create_autospec(Client, instance=True, **kw) + + def test_upload_many_with_filenames(self): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock()), + ("file_b.txt", mock.Mock()) + ] + FAKE_CONTENT_TYPE = "text/fake" + UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} + EXPECTED_UPLOAD_KWARGS = {"if_not_generation_match": 0, **UPLOAD_KWARGS} + FAKE_RESULT = "nothing to see here" + + for _, blob_mock in FILE_BLOB_PAIRS: + blob_mock.upload_from_filename.return_value = FAKE_RESULT + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS) + for (filename, mock_blob) in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.assert_any_call(filename, **EXPECTED_UPLOAD_KWARGS) + for result in results: + self.assertEqual(result, FAKE_RESULT) + + def test_upload_many_with_file_objs(self): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock()), + (tempfile.TemporaryFile(), mock.Mock()) + ] + FAKE_CONTENT_TYPE = "text/fake" + UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} + EXPECTED_UPLOAD_KWARGS = {"if_not_generation_match": 0, **UPLOAD_KWARGS} + FAKE_RESULT = "nothing to see here" + + for _, blob_mock in FILE_BLOB_PAIRS: + blob_mock.upload_from_file.return_value = FAKE_RESULT + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS) + for (file, mock_blob) in FILE_BLOB_PAIRS: + mock_blob.upload_from_file.assert_any_call(file, **EXPECTED_UPLOAD_KWARGS) + for result in results: + self.assertEqual(result, FAKE_RESULT) + + def test_upload_many_passes_concurrency_options(self): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock()), + (tempfile.TemporaryFile(), mock.Mock()) + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + transfer_manager.upload_many( + FILE_BLOB_PAIRS, + max_workers=MAX_WORKERS, + deadline=DEADLINE) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + def test_upload_many_suppresses_exceptions(self): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock()), + ("file_b.txt", mock.Mock()) + ] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = ConnectionError() + + results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + for result in results: + self.assertEqual(type(result), ConnectionError) + + def test_upload_many_raises_exceptions(self): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock()), + ("file_b.txt", mock.Mock()) + ] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = ConnectionError() + + with self.assertRaises(ConnectionError): + transfer_manager.upload_many( + FILE_BLOB_PAIRS, + raise_exception=True) + + def test_download_many_with_filenames(self): + BLOB_FILE_PAIRS = [ + (mock.Mock(), "file_a.txt"), + (mock.Mock(), "file_b.txt") + ] + FAKE_ENCODING = "fake_gzip" + DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} + FAKE_RESULT = "nothing to see here" + + for blob_mock, _ in BLOB_FILE_PAIRS: + blob_mock.download_to_filename.return_value = FAKE_RESULT + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS) + for (mock_blob, file) in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.assert_any_call(file, **DOWNLOAD_KWARGS) + for result in results: + self.assertEqual(result, FAKE_RESULT) + + def test_download_many_with_file_objs(self): + BLOB_FILE_PAIRS = [ + (mock.Mock(), tempfile.TemporaryFile()), + (mock.Mock(), tempfile.TemporaryFile()) + ] + FAKE_ENCODING = "fake_gzip" + DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} + FAKE_RESULT = "nothing to see here" + + for blob_mock, _ in BLOB_FILE_PAIRS: + blob_mock.download_to_file.return_value = FAKE_RESULT + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS) + for (mock_blob, file) in BLOB_FILE_PAIRS: + mock_blob.download_to_file.assert_any_call(file, **DOWNLOAD_KWARGS) + for result in results: + self.assertEqual(result, FAKE_RESULT) + + def test_download_many_passes_concurrency_options(self): + BLOB_FILE_PAIRS = [ + (mock.Mock(), tempfile.TemporaryFile()), + (mock.Mock(), tempfile.TemporaryFile()) + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + transfer_manager.download_many( + BLOB_FILE_PAIRS, + max_workers=MAX_WORKERS, + deadline=DEADLINE) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + def test_download_many_suppresses_exceptions(self): + BLOB_FILE_PAIRS = [ + (mock.Mock(), "file_a.txt"), + (mock.Mock(), "file_b.txt") + ] + for mock_blob, _ in BLOB_FILE_PAIRS: + mock_blob.download_from_filename.side_effect = ConnectionError() + + results = transfer_manager.download_many(BLOB_FILE_PAIRS) + for result in results: + self.assertEqual(type(result), ConnectionError) + + def test_download_many_raises_exceptions(self): + BLOB_FILE_PAIRS = [ + (mock.Mock(), "file_a.txt"), + (mock.Mock(), "file_b.txt") + ] + for mock_blob, _ in BLOB_FILE_PAIRS: + mock_blob.download_from_filename.side_effect = ConnectionError() + + results = transfer_manager.download_many(BLOB_FILE_PAIRS) + for result in results: + self.assertEqual(type(result), ConnectionError) + + with self.assertRaises(ConnectionError): + transfer_manager.download_many( + FILE_BLOB_PAIRS, + raise_exception=True) From 97f8f26b5a64dc26f19fb99e27f9982e943ef5f7 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 18 Aug 2022 18:41:10 -0700 Subject: [PATCH 03/19] more tests --- tests/unit/test_transfer_manager.py | 32 ++++++++++++++--------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index c53709bc5..70002124e 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -176,19 +176,19 @@ def test_download_many_suppresses_exceptions(self): for result in results: self.assertEqual(type(result), ConnectionError) - def test_download_many_raises_exceptions(self): - BLOB_FILE_PAIRS = [ - (mock.Mock(), "file_a.txt"), - (mock.Mock(), "file_b.txt") - ] - for mock_blob, _ in BLOB_FILE_PAIRS: - mock_blob.download_from_filename.side_effect = ConnectionError() - - results = transfer_manager.download_many(BLOB_FILE_PAIRS) - for result in results: - self.assertEqual(type(result), ConnectionError) - - with self.assertRaises(ConnectionError): - transfer_manager.download_many( - FILE_BLOB_PAIRS, - raise_exception=True) + # def test_download_many_raises_exceptions(self): + # BLOB_FILE_PAIRS = [ + # (mock.Mock(), "file_a.txt"), + # (mock.Mock(), "file_b.txt") + # ] + # for mock_blob, _ in BLOB_FILE_PAIRS: + # mock_blob.download_from_filename.side_effect = ConnectionError() + + # results = transfer_manager.download_many(BLOB_FILE_PAIRS) + # for result in results: + # self.assertEqual(type(result), ConnectionError) + + # with self.assertRaises(ConnectionError): + # transfer_manager.download_many( + # FILE_BLOB_PAIRS, + # raise_exception=True) From 93517b0c41f08316d1aef217e38f98d033a109e4 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 19 Aug 2022 11:42:29 -0700 Subject: [PATCH 04/19] code and tests for transfer manager complete --- google/cloud/storage/blob.py | 2 - google/cloud/storage/transfer_manager.py | 62 +++--- tests/unit/test_transfer_manager.py | 248 ++++++++++++++++------- 3 files changed, 215 insertions(+), 97 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 7b2df376d..6f4952f44 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -19,14 +19,12 @@ import base64 import cgi -import concurrent import copy import hashlib from io import BytesIO from io import TextIOWrapper import logging import mimetypes -import math import os import re from urllib.parse import parse_qsl diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 235184d2a..72a9d818f 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from google.cloud.storage.blob import Blob -from google.cloud.storage.bucket import Bucket - import concurrent.futures import tempfile @@ -26,7 +23,7 @@ def upload_many( upload_kwargs=None, max_workers=None, deadline=None, - raise_exception=False + raise_exception=False, ): if upload_kwargs is None: upload_kwargs = {} @@ -36,13 +33,16 @@ def upload_many( with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for path_or_file, blob in file_blob_pairs: - method = blob.upload_from_filename if isinstance(path_or_file, str) else blob.upload_from_file + method = ( + blob.upload_from_filename + if isinstance(path_or_file, str) + else blob.upload_from_file + ) futures.append(executor.submit(method, path_or_file, **upload_kwargs)) results = [] concurrent.futures.wait( - futures, - timeout=deadline, - return_when=concurrent.futures.ALL_COMPLETED) + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) for future in futures: if not raise_exception: exp = future.exception() @@ -58,18 +58,23 @@ def download_many( download_kwargs=None, max_workers=None, deadline=None, - raise_exception=False + raise_exception=False, ): if download_kwargs is None: download_kwargs = {} with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for blob, path_or_file in blob_file_pairs: - method = blob.download_to_filename if isinstance(path_or_file, str) else blob.download_to_file + method = ( + blob.download_to_filename + if isinstance(path_or_file, str) + else blob.download_to_file + ) futures.append(executor.submit(method, path_or_file, **download_kwargs)) results = [] - concurrent.futures.wait(futures, timeout=deadline, - return_when=concurrent.futures.ALL_COMPLETED) + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) for future in futures: if not raise_exception: exp = future.exception() @@ -83,17 +88,17 @@ def download_many( def download_chunks_concurrently_to_file( blob, file_obj, - chunk_size=200*1024*1024, + chunk_size=200 * 1024 * 1024, max_workers=None, download_kwargs=None, - deadline=None + deadline=None, ): + if download_kwargs is None: + download_kwargs = {} # We must know the size of the object, and the generation. if not blob.size or not blob.generation: blob.reload() - chunks = math.ceil(chunk_size / blob.size) - def download_range_via_tempfile(blob, file_obj, start, end, download_kwargs): tmp = tempfile.TemporaryFile() blob.download_to_file(tmp, start=start, end=end, **download_kwargs) @@ -105,13 +110,22 @@ def download_range_via_tempfile(blob, file_obj, start, end, download_kwargs): cursor = 0 while cursor < blob.size: start = cursor - cursor = min(cursor+chunk_size, blob.size) + cursor = min(cursor + chunk_size, blob.size) futures.append( - executor.submit(download_range_via_tempfile, blob, file_obj, start=start, end=cursor-1, download_kwargs=download_kwargs)) + executor.submit( + download_range_via_tempfile, + blob, + file_obj, + start=start, + end=cursor - 1, + download_kwargs=download_kwargs, + ) + ) # Wait until all futures are done and process them in order. - concurrent.futures.wait(timeout=deadline, - return_when=concurrent.futures.ALL_COMPLETED) + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) for future in futures: tmp = future.result() tmp.seek(0) @@ -128,7 +142,7 @@ def upload_many_from_filenames( upload_kwargs=None, max_workers=None, deadline=None, - raise_exception=False + raise_exception=False, ): file_blob_pairs = [] @@ -144,7 +158,7 @@ def upload_many_from_filenames( upload_kwargs=upload_kwargs, max_workers=max_workers, deadline=deadline, - raise_exception=False + raise_exception=raise_exception, ) @@ -156,7 +170,7 @@ def download_many_to_path( download_kwargs=None, max_workers=None, deadline=None, - raise_exception=False + raise_exception=False, ): blob_file_pairs = [] @@ -170,5 +184,5 @@ def download_many_to_path( download_kwargs=download_kwargs, max_workers=max_workers, deadline=deadline, - raise_exception=False + raise_exception=raise_exception, ) diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 70002124e..63b09c99e 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -14,22 +14,15 @@ from google.cloud.storage import transfer_manager +import io import tempfile import unittest import mock -class Test_Transfer_Manager(unittest.TestCase): - @staticmethod - def _make_client(*args, **kw): - from google.cloud.storage.client import Client - - return mock.create_autospec(Client, instance=True, **kw) +class Test_Transfer_Manager(unittest.TestCase): def test_upload_many_with_filenames(self): - FILE_BLOB_PAIRS = [ - ("file_a.txt", mock.Mock()), - ("file_b.txt", mock.Mock()) - ] + FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] FAKE_CONTENT_TYPE = "text/fake" UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} EXPECTED_UPLOAD_KWARGS = {"if_not_generation_match": 0, **UPLOAD_KWARGS} @@ -39,18 +32,19 @@ def test_upload_many_with_filenames(self): blob_mock.upload_from_filename.return_value = FAKE_RESULT results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, - skip_if_exists=True, - upload_kwargs=UPLOAD_KWARGS) + FILE_BLOB_PAIRS, skip_if_exists=True, upload_kwargs=UPLOAD_KWARGS + ) for (filename, mock_blob) in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.assert_any_call(filename, **EXPECTED_UPLOAD_KWARGS) + mock_blob.upload_from_filename.assert_any_call( + filename, **EXPECTED_UPLOAD_KWARGS + ) for result in results: self.assertEqual(result, FAKE_RESULT) def test_upload_many_with_file_objs(self): FILE_BLOB_PAIRS = [ (tempfile.TemporaryFile(), mock.Mock()), - (tempfile.TemporaryFile(), mock.Mock()) + (tempfile.TemporaryFile(), mock.Mock()), ] FAKE_CONTENT_TYPE = "text/fake" UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} @@ -61,9 +55,8 @@ def test_upload_many_with_file_objs(self): blob_mock.upload_from_file.return_value = FAKE_RESULT results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, - skip_if_exists=True, - upload_kwargs=UPLOAD_KWARGS) + FILE_BLOB_PAIRS, skip_if_exists=True, upload_kwargs=UPLOAD_KWARGS + ) for (file, mock_blob) in FILE_BLOB_PAIRS: mock_blob.upload_from_file.assert_any_call(file, **EXPECTED_UPLOAD_KWARGS) for result in results: @@ -72,23 +65,23 @@ def test_upload_many_with_file_objs(self): def test_upload_many_passes_concurrency_options(self): FILE_BLOB_PAIRS = [ (tempfile.TemporaryFile(), mock.Mock()), - (tempfile.TemporaryFile(), mock.Mock()) + (tempfile.TemporaryFile(), mock.Mock()), ] MAX_WORKERS = 7 DEADLINE = 10 - with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + with mock.patch( + "concurrent.futures.ThreadPoolExecutor" + ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: transfer_manager.upload_many( - FILE_BLOB_PAIRS, - max_workers=MAX_WORKERS, - deadline=DEADLINE) + FILE_BLOB_PAIRS, max_workers=MAX_WORKERS, deadline=DEADLINE + ) pool_patch.assert_called_with(max_workers=MAX_WORKERS) - wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + wait_patch.assert_called_with( + mock.ANY, timeout=DEADLINE, return_when=mock.ANY + ) def test_upload_many_suppresses_exceptions(self): - FILE_BLOB_PAIRS = [ - ("file_a.txt", mock.Mock()), - ("file_b.txt", mock.Mock()) - ] + FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] for _, mock_blob in FILE_BLOB_PAIRS: mock_blob.upload_from_filename.side_effect = ConnectionError() @@ -97,23 +90,15 @@ def test_upload_many_suppresses_exceptions(self): self.assertEqual(type(result), ConnectionError) def test_upload_many_raises_exceptions(self): - FILE_BLOB_PAIRS = [ - ("file_a.txt", mock.Mock()), - ("file_b.txt", mock.Mock()) - ] + FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] for _, mock_blob in FILE_BLOB_PAIRS: mock_blob.upload_from_filename.side_effect = ConnectionError() with self.assertRaises(ConnectionError): - transfer_manager.upload_many( - FILE_BLOB_PAIRS, - raise_exception=True) + transfer_manager.upload_many(FILE_BLOB_PAIRS, raise_exception=True) def test_download_many_with_filenames(self): - BLOB_FILE_PAIRS = [ - (mock.Mock(), "file_a.txt"), - (mock.Mock(), "file_b.txt") - ] + BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] FAKE_ENCODING = "fake_gzip" DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} FAKE_RESULT = "nothing to see here" @@ -122,8 +107,8 @@ def test_download_many_with_filenames(self): blob_mock.download_to_filename.return_value = FAKE_RESULT results = transfer_manager.download_many( - BLOB_FILE_PAIRS, - download_kwargs=DOWNLOAD_KWARGS) + BLOB_FILE_PAIRS, download_kwargs=DOWNLOAD_KWARGS + ) for (mock_blob, file) in BLOB_FILE_PAIRS: mock_blob.download_to_filename.assert_any_call(file, **DOWNLOAD_KWARGS) for result in results: @@ -132,7 +117,7 @@ def test_download_many_with_filenames(self): def test_download_many_with_file_objs(self): BLOB_FILE_PAIRS = [ (mock.Mock(), tempfile.TemporaryFile()), - (mock.Mock(), tempfile.TemporaryFile()) + (mock.Mock(), tempfile.TemporaryFile()), ] FAKE_ENCODING = "fake_gzip" DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} @@ -142,8 +127,8 @@ def test_download_many_with_file_objs(self): blob_mock.download_to_file.return_value = FAKE_RESULT results = transfer_manager.download_many( - BLOB_FILE_PAIRS, - download_kwargs=DOWNLOAD_KWARGS) + BLOB_FILE_PAIRS, download_kwargs=DOWNLOAD_KWARGS + ) for (mock_blob, file) in BLOB_FILE_PAIRS: mock_blob.download_to_file.assert_any_call(file, **DOWNLOAD_KWARGS) for result in results: @@ -152,43 +137,164 @@ def test_download_many_with_file_objs(self): def test_download_many_passes_concurrency_options(self): BLOB_FILE_PAIRS = [ (mock.Mock(), tempfile.TemporaryFile()), - (mock.Mock(), tempfile.TemporaryFile()) + (mock.Mock(), tempfile.TemporaryFile()), ] MAX_WORKERS = 7 DEADLINE = 10 - with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + with mock.patch( + "concurrent.futures.ThreadPoolExecutor" + ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: transfer_manager.download_many( - BLOB_FILE_PAIRS, - max_workers=MAX_WORKERS, - deadline=DEADLINE) + BLOB_FILE_PAIRS, max_workers=MAX_WORKERS, deadline=DEADLINE + ) pool_patch.assert_called_with(max_workers=MAX_WORKERS) - wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + wait_patch.assert_called_with( + mock.ANY, timeout=DEADLINE, return_when=mock.ANY + ) def test_download_many_suppresses_exceptions(self): - BLOB_FILE_PAIRS = [ - (mock.Mock(), "file_a.txt"), - (mock.Mock(), "file_b.txt") - ] + BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] for mock_blob, _ in BLOB_FILE_PAIRS: - mock_blob.download_from_filename.side_effect = ConnectionError() + mock_blob.download_to_filename.side_effect = ConnectionError() results = transfer_manager.download_many(BLOB_FILE_PAIRS) for result in results: self.assertEqual(type(result), ConnectionError) - # def test_download_many_raises_exceptions(self): - # BLOB_FILE_PAIRS = [ - # (mock.Mock(), "file_a.txt"), - # (mock.Mock(), "file_b.txt") - # ] - # for mock_blob, _ in BLOB_FILE_PAIRS: - # mock_blob.download_from_filename.side_effect = ConnectionError() - - # results = transfer_manager.download_many(BLOB_FILE_PAIRS) - # for result in results: - # self.assertEqual(type(result), ConnectionError) - - # with self.assertRaises(ConnectionError): - # transfer_manager.download_many( - # FILE_BLOB_PAIRS, - # raise_exception=True) + def test_download_many_raises_exceptions(self): + BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] + for mock_blob, _ in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.side_effect = ConnectionError() + + transfer_manager.download_many(BLOB_FILE_PAIRS) + with self.assertRaises(ConnectionError): + transfer_manager.download_many(BLOB_FILE_PAIRS, raise_exception=True) + + def test_download_chunks_concurrently_to_file(self): + BLOB_CONTENTS = b"1234567812345678A" + blob = mock.Mock() + blob.size = len(BLOB_CONTENTS) + blob.generation = None + + FAKE_ENCODING = "fake-gzip" + DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} + + def fake_download_to_file(file_obj, start, end, **kwargs): + file_obj.write(BLOB_CONTENTS[start : end + 1]) + self.assertEqual(kwargs, DOWNLOAD_KWARGS) + + blob.download_to_file = fake_download_to_file + + file_obj = io.BytesIO() + + transfer_manager.download_chunks_concurrently_to_file( + blob, file_obj, chunk_size=4, download_kwargs=DOWNLOAD_KWARGS + ) + + # Generation wasn't set, so reload should have been called. + blob.reload.assert_called_with() + + file_obj.seek(0) + result = file_obj.read() + self.assertEqual(result, BLOB_CONTENTS) + + def test_download_chunks_passes_concurrency_arguments_and_kwargs(self): + blob = mock.Mock() + blob.size = 17 + blob.generation = 1 + + file_obj = mock.Mock() + + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch( + "concurrent.futures.ThreadPoolExecutor" + ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + transfer_manager.download_chunks_concurrently_to_file( + blob, file_obj, chunk_size=4, max_workers=MAX_WORKERS, deadline=DEADLINE + ) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with( + mock.ANY, timeout=DEADLINE, return_when=mock.ANY + ) + + def test_upload_many_from_filenames(self): + bucket = mock.Mock() + + FILENAMES = ["file_a.txt", "file_b.txt"] + ROOT = "mypath/" + PREFIX = "myprefix/" + KEY_NAME = "keyname" + BLOB_CONSTRUCTOR_KWARGS = {"kms_key_name": KEY_NAME} + UPLOAD_KWARGS = {"content-type": "text/fake"} + MAX_WORKERS = 7 + DEADLINE = 10 + + EXPECTED_FILE_BLOB_PAIRS = [ + (ROOT + filename, mock.ANY) for filename in FILENAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.upload_many" + ) as mock_upload_many: + transfer_manager.upload_many_from_filenames( + bucket, + FILENAMES, + ROOT, + prefix=PREFIX, + skip_if_exists=True, + blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, + upload_kwargs=UPLOAD_KWARGS, + max_workers=MAX_WORKERS, + deadline=DEADLINE, + raise_exception=True, + ) + + mock_upload_many.assert_called_once_with( + EXPECTED_FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + max_workers=MAX_WORKERS, + deadline=DEADLINE, + raise_exception=True, + ) + bucket.blob.assert_any_call(PREFIX + FILENAMES[0], **BLOB_CONSTRUCTOR_KWARGS) + bucket.blob.assert_any_call(PREFIX + FILENAMES[1], **BLOB_CONSTRUCTOR_KWARGS) + + def test_download_many_to_path(self): + bucket = mock.Mock() + + BLOBNAMES = ["file_a.txt", "file_b.txt"] + PATH_ROOT = "mypath/" + BLOB_NAME_PREFIX = "myprefix/" + DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} + MAX_WORKERS = 7 + DEADLINE = 10 + + EXPECTED_BLOB_FILE_PAIRS = [ + (mock.ANY, PATH_ROOT + blobname) for blobname in BLOBNAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.download_many" + ) as mock_download_many: + transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + PATH_ROOT, + blob_name_prefix=BLOB_NAME_PREFIX, + download_kwargs=DOWNLOAD_KWARGS, + max_workers=MAX_WORKERS, + deadline=DEADLINE, + raise_exception=True, + ) + + mock_download_many.assert_called_once_with( + EXPECTED_BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + max_workers=MAX_WORKERS, + deadline=DEADLINE, + raise_exception=True, + ) + bucket.blob.assert_any_call(BLOB_NAME_PREFIX + BLOBNAMES[0]) + bucket.blob.assert_any_call(BLOB_NAME_PREFIX + BLOBNAMES[1]) From 7666e2fbd15145aa0741b31ca70922436fd74882 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 19 Aug 2022 12:46:18 -0700 Subject: [PATCH 05/19] proactively close temp files when finished reading --- google/cloud/storage/transfer_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 72a9d818f..fa882e9a4 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -130,6 +130,7 @@ def download_range_via_tempfile(blob, file_obj, start, end, download_kwargs): tmp = future.result() tmp.seek(0) file_obj.write(tmp.read()) + tmp.close() def upload_many_from_filenames( From b06e4a0546307d9554d3288546b01fd985685720 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 23 Aug 2022 13:10:23 -0700 Subject: [PATCH 06/19] respond to comments; destroy tmp files as they are consumed --- google/cloud/storage/transfer_manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index fa882e9a4..638831206 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -99,7 +99,7 @@ def download_chunks_concurrently_to_file( if not blob.size or not blob.generation: blob.reload() - def download_range_via_tempfile(blob, file_obj, start, end, download_kwargs): + def download_range_via_tempfile(blob, start, end, download_kwargs): tmp = tempfile.TemporaryFile() blob.download_to_file(tmp, start=start, end=end, **download_kwargs) return tmp @@ -115,7 +115,6 @@ def download_range_via_tempfile(blob, file_obj, start, end, download_kwargs): executor.submit( download_range_via_tempfile, blob, - file_obj, start=start, end=cursor - 1, download_kwargs=download_kwargs, From 0a701a69f197a6ddb2a3bc968acf7c2e5aede2a0 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 3 Oct 2022 14:46:50 -0700 Subject: [PATCH 07/19] Add system tests, docstrings, address feedback --- google/cloud/storage/constants.py | 1 + google/cloud/storage/fileio.py | 6 +- google/cloud/storage/transfer_manager.py | 361 ++++++++++++++++++++++- tests/system/test_transfer_manager.py | 100 +++++++ tests/unit/test_transfer_manager.py | 19 +- 5 files changed, 475 insertions(+), 12 deletions(-) create mode 100644 tests/system/test_transfer_manager.py diff --git a/google/cloud/storage/constants.py b/google/cloud/storage/constants.py index babbc5a42..5d6497295 100644 --- a/google/cloud/storage/constants.py +++ b/google/cloud/storage/constants.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + """Constants used across google.cloud.storage modules.""" # Storage classes diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index d3ae135bb..d09a3c885 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Support for file-like I/O.""" +"""Module for file-like access of blobs, usually invoked via Blob.open().""" import io import warnings @@ -101,10 +101,12 @@ class BlobReader(io.BufferedIOBase): - ``if_metageneration_match`` - ``if_metageneration_not_match`` - ``timeout`` + + Note that download_kwargs are also applied to blob.reload(), if a reload + is needed during seek(). """ def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs): - """docstring note that download_kwargs also used for reload()""" for kwarg in download_kwargs: if kwarg not in VALID_DOWNLOAD_KWARGS: raise ValueError( diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 638831206..6a28c89e1 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Concurrent media operations. This is a FEATURE PREVIEW: API may change.""" + import concurrent.futures import tempfile +from google.api_core import exceptions + def upload_many( file_blob_pairs, @@ -25,10 +29,66 @@ def upload_many( deadline=None, raise_exception=False, ): + """Upload many files concurrently via a worker pool. + + This function is a FEATURE PREVIEW: the API may change in a future version. + + :type file_blob_pairs: List(Tuple(IOBase or str, 'google.cloud.storage.blob.Blob')) + :param file_blob_pairs: + A list of tuples of a file or filename and a blob. Each file will be + uploaded to the corresponding blob by using blob.upload_from_file() or + blob.upload_from_filename() as appropriate. + + :type skip_if_exists: bool + :param skip_if_exists: + If True, blobs that already have a live version will not be overwritten. + This is accomplished by setting "if_generation_match = 0" on uploads. + Uploads so skipped will result in a 412 Precondition Failed response + code, which will be included in the return value but not raised + as an exception regardless of the value of raise_exception. + + :type upload_kwargs: dict + :param upload_kwargs: + A dictionary of keyword arguments to pass to the upload method. Refer + to the documentation for blob.upload_from_file() or + blob.upload_from_filename() for more information. The dict is directly + passed into the upload methods and is not validated by this function. + + :type max_workers: int + :param max_workers: + The number of workers (effectively, the number of threads) to use in + the worker pool. Refer to concurrent.futures.ThreadPoolExecutor + documentation for details. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. + + :type raise_exception: bool + :param raise_exception: + If True, instead of adding exceptions to the list of return values, + instead they will be raised. Note that encountering an exception on one + operation will not prevent other operations from starting. Exceptions + are only processed and potentially raised after all operations are + complete in success or failure. + + If skip_if_exists is True, 412 Precondition Failed responses are + considered part of normal operation and are not raised as an exception. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + + :rtype: list + :returns: A list of results corresponding to, in order, each item in the + input list. If an exception was received, it will be the result + for that operation. Otherwise, the return value from the successful + upload method is used (typically, None). + """ if upload_kwargs is None: upload_kwargs = {} if skip_if_exists: - upload_kwargs["if_not_generation_match"] = 0 + upload_kwargs["if_generation_match"] = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] @@ -44,12 +104,18 @@ def upload_many( futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED ) for future in futures: - if not raise_exception: - exp = future.exception() - if exp: - results.append(exp) - continue - results.append(future.result()) + exp = future.exception() + + # If raise_exception is False, don't call future.result() + if exp and not raise_exception: + results.append(exp) + # If skip_if_exists and the exception is PreconditionFailed, do same. + elif exp and skip_if_exists and isinstance(exp, exceptions.PreconditionFailed): + results.append(exp) + # Get the real result. If there was an exception not handled above, + # this will raise it. + else: + results.append(future.result()) return results @@ -60,6 +126,52 @@ def download_many( deadline=None, raise_exception=False, ): + """Download many blobs concurrently via a worker pool. + + This function is a FEATURE PREVIEW: the API may change in a future version. + + :type blob_file_pairs: List(Tuple('google.cloud.storage.blob.Blob', IOBase or str)) + :param blob_file_pairs: + A list of tuples of blob and a file or filename. Each blob will be + downloaded to the corresponding blob by using blob.download_to_file() or + blob.download_to_filename() as appropriate. + + :type download_kwargs: dict + :param download_kwargs: + A dictionary of keyword arguments to pass to the download method. Refer + to the documentation for blob.download_to_file() or + blob.download_to_filename() for more information. The dict is directly + passed into the download methods and is not validated by this function. + + :type max_workers: int + :param max_workers: + The number of workers (effectively, the number of threads) to use in + the worker pool. Refer to concurrent.futures.ThreadPoolExecutor + documentation for details. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. + + :type raise_exception: bool + :param raise_exception: + If True, instead of adding exceptions to the list of return values, + instead they will be raised. Note that encountering an exception on one + operation will not prevent other operations from starting. Exceptions + are only processed and potentially raised after all operations are + complete in success or failure. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + + :rtype: list + :returns: A list of results corresponding to, in order, each item in the + input list. If an exception was received, it will be the result + for that operation. Otherwise, the return value from the successful + download method is used (typically, None). + """ + if download_kwargs is None: download_kwargs = {} with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: @@ -89,10 +201,49 @@ def download_chunks_concurrently_to_file( blob, file_obj, chunk_size=200 * 1024 * 1024, - max_workers=None, download_kwargs=None, + max_workers=None, deadline=None, ): + """Download a single blob in chunks, concurrently. + + This function is a FEATURE PREVIEW: the API may change in a future version. + + Use of this function, in cases where single threads are unable to fully + saturate available network bandwidth, may improve download performance for + large objects. + + :type blob: 'google.cloud.storage.blob.Blob' + :param blob: + The blob to download. + + :type file_obj: IOBase + :param file_obj: The file object to which the downloaded chunks will be + written. Chunks are written in order. While the current implementation + of this function does not use seek(), a future version may use seek() to + write chunks out of order to improve write performance. + + :type chunk_size: int + :param chunk_size: The size of each chunk. An excessively small size may + have a negative performance impact, as each chunk will be uploaded in a + separate HTTP request. + + :type download_kwargs: dict + :param download_kwargs: + A dictionary of keyword arguments to pass to the download method. Refer + to the documentation for blob.download_to_file() or + blob.download_to_filename() for more information. The dict is directly + passed into the download methods and is not validated by this function. + + :type max_workers: int + :param max_workers: + The number of workers (effectively, the number of threads) to use in + the worker pool. Refer to concurrent.futures.ThreadPoolExecutor + documentation for details. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + """ + if download_kwargs is None: download_kwargs = {} # We must know the size of the object, and the generation. @@ -144,6 +295,113 @@ def upload_many_from_filenames( deadline=None, raise_exception=False, ): + """Upload many files concurrently by their filenames. + + This function is a FEATURE PREVIEW: the API may change in a future version. + + The destination blobs are automatically created, with blob names based on + the source filenames and the prefix. + + For example, if the `filenames` include "images/icon.jpg", `root` is + "/home/myuser/", and `prefix` is "myfiles/", then the file at + "/home/myuser/images/icon.jpg" will be uploaded to a blob named + "myfiles/images/icon.jpg". + + :type bucket: 'google.cloud.storage.bucket.Bucket' + :param bucket: + The bucket which will contain the uploaded blobs. + + :type filenames: list(str) + :param filenames: + A list of filenames to be uploaded. This may include part of the path. + The full path to the file must be root + filename. The filename is + separate from the root because the filename will also determine the + name of the destination blob. + + :type root: str + :param root: + A string that will be prepended to each filename in the input list, in + order to find the source file for each blob. Unlike the filename itself, + the root string does not affect the name of the uploaded blob itself. + The root string will usually end in "/" (or "\\" depending on platform) + but is not required to do so. + + For instance, if the root string is "/tmp/img-" and a filename is + "0001.jpg", with an empty prefix, then the file uploaded will be + "/tmp/img-0001.jpg" and the destination blob will be "0001.jpg". + + This parameter can be an empty string, but has no default because almost + all use cases will use a non-empty root. + + :type prefix: str + :param prefix: + A string that will be prepended to each filename in the input list, in + order to determine the name of the destination blob. Unlike the filename + itself, the prefix string does not affect the location the library will + look for the source data on the local filesystem. + + For instance, if the root is "/tmp/img-", the prefix is + "myuser/mystuff-" and a filename is "0001.jpg" then the file uploaded + will be "/tmp/img-0001.jpg" and the destination blob will be + "myuser/mystuff-0001.jpg". + + The prefix can be blank (an empty string). + + :type skip_if_exists: bool + :param skip_if_exists: + If True, blobs that already have a live version will not be overwritten. + This is accomplished by setting "if_generation_match = 0" on uploads. + Uploads so skipped will result in a 412 Precondition Failed response + code, which will be included in the return value but not raised + as an exception regardless of the value of raise_exception. + + :type blob_constructor_kwargs: dict + :param blob_constructor_kwargs: + A dictionary of keyword arguments to pass to the blob constructor. Refer + to the documentation for blob.Blob() for more information. The dict is + directly passed into the constructor and is not validated by this + function. `name` and `bucket` keyword arguments are reserved by this + function and will result in an error if passed in here. + + :type upload_kwargs: dict + :param upload_kwargs: + A dictionary of keyword arguments to pass to the upload method. Refer + to the documentation for blob.upload_from_file() or + blob.upload_from_filename() for more information. The dict is directly + passed into the upload methods and is not validated by this function. + + :type max_workers: int + :param max_workers: + The number of workers (effectively, the number of threads) to use in + the worker pool. Refer to concurrent.futures.ThreadPoolExecutor + documentation for details. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. + + :type raise_exception: bool + :param raise_exception: + If True, instead of adding exceptions to the list of return values, + instead they will be raised. Note that encountering an exception on one + operation will not prevent other operations from starting. Exceptions + are only processed and potentially raised after all operations are + complete in success or failure. + + If skip_if_exists is True, 412 Precondition Failed responses are + considered part of normal operation and are not raised as an exception. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + + :rtype: list + :returns: A list of results corresponding to, in order, each item in the + input list. If an exception was received, it will be the result + for that operation. Otherwise, the return value from the successful + upload method is used (typically, None). + """ + file_blob_pairs = [] for filename in filenames: @@ -172,6 +430,93 @@ def download_many_to_path( deadline=None, raise_exception=False, ): + """Download many files concurrently by their blob names. + + This function is a FEATURE PREVIEW: the API may change in a future version. + + The destination files are automatically created, with filenames based on + the source blob_names and the path_root. + + For example, if the `blob_names` include "icon.jpg", `path_root` is + "/home/myuser/", and `blob_name_prefix` is "images/", then the blob named + "images/icon.jpg" will be downloaded to a file named + "/home/myuser/icon.jpg". + + :type bucket: 'google.cloud.storage.bucket.Bucket' + :param bucket: + The bucket which contains the blobs to be downloaded + + :type blob_names: list(str) + :param blob_names: + A list of blobs to be downloaded. The blob name in this string will be + used to determine the destination file path as well. + + The full name to the blob must be blob_name_prefix + blob_name. The + blob_name is separate from the blob_name_prefix because the blob_name + will also determine the name of the destination blob. Any shared part of + the blob names that need not be part of the destination path should be + included in the blob_name_prefix. + + :type path_root: str + :param path_root: + A string that will be prepended to each blob_name in the input list, + in order to determine the destination path for that blob. The path_root + string will usually end in "/" (or "\\" depending on platform) but is + not required to do so. For instance, if the path_root string is + "/tmp/img-" and a blob_name is "0001.jpg", with an empty + blob_name_prefix, then the source blob "0001.jpg" will be downloaded to + destination "/tmp/img-0001.jpg" . This parameter can be an empty string, + but has no default because almost all use cases will use a non-blank + root. + + :type blob_name_prefix: str + :param blob_name_prefix: + A string that will be prepended to each blob_name in the input list, in + order to determine the name of the source blob. Unlike the blob_name + itself, the prefix string does not affect the destination path on the + local filesystem. For instance, if the path_root is "/tmp/img-", the + blob_name_prefix is "myuser/mystuff-" and a blob_name is "0001.jpg" then + the source blob "myuser/mystuff-0001.jpg" will be downloaded to + "/tmp/img-0001.jpg". The blob_name_prefix can be blank (an empty + string). + + :type download_kwargs: dict + :param download_kwargs: + A dictionary of keyword arguments to pass to the download method. Refer + to the documentation for blob.download_to_file() or + blob.download_to_filename() for more information. The dict is directly + passed into the download methods and is not validated by this function. + + :type max_workers: int + :param max_workers: + The number of workers (effectively, the number of threads) to use in + the worker pool. Refer to concurrent.futures.ThreadPoolExecutor + documentation for details. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. + + :type raise_exception: bool + :param raise_exception: + If True, instead of adding exceptions to the list of return values, + instead they will be raised. Note that encountering an exception on one + operation will not prevent other operations from starting. Exceptions + are only processed and potentially raised after all operations are + complete in success or failure. If skip_if_exists is True, 412 + Precondition Failed responses are considered part of normal operation + and are not raised as an exception. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + + :rtype: list + :returns: A list of results corresponding to, in order, each item in the + input list. If an exception was received, it will be the result + for that operation. Otherwise, the return value from the successful + download method is used (typically, None). + """ blob_file_pairs = [] for blob_name in blob_names: diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py new file mode 100644 index 000000000..efd0698bc --- /dev/null +++ b/tests/system/test_transfer_manager.py @@ -0,0 +1,100 @@ +# coding=utf-8 +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tempfile + +from google.cloud.storage import transfer_manager + +from google.api_core import exceptions + + +def test_upload_many(shared_bucket, file_data, blobs_to_delete): + FILE_BLOB_PAIRS = [ + (file_data["simple"]["path"], shared_bucket.blob("simple1")), + (file_data["simple"]["path"], shared_bucket.blob("simple2")), + ] + + results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + assert results == [None, None] + + blobs = shared_bucket.list_blobs() + for blob in blobs: + if blob.name.startswith("simple"): + blobs_to_delete.append(blob) + assert len(blobs_to_delete) == 2 + + +def test_upload_many_with_file_objs(shared_bucket, file_data, blobs_to_delete): + FILE_BLOB_PAIRS = [ + (open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple1")), + (open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple2")), + ] + + results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + assert results == [None, None] + + blobs = shared_bucket.list_blobs() + for blob in blobs: + if blob.name.startswith("simple"): + blobs_to_delete.append(blob) + assert len(blobs_to_delete) == 2 + + +def test_upload_many_skip_if_exists( + listable_bucket, listable_filenames, file_data, blobs_to_delete +): + FILE_BLOB_PAIRS = [ + (file_data["logo"]["path"], listable_bucket.blob(listable_filenames[0])), + (file_data["simple"]["path"], listable_bucket.blob("simple")), + ] + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, skip_if_exists=True, raise_exception=True + ) + assert isinstance(results[0], exceptions.PreconditionFailed) + assert results[1] is None + + blobs = listable_bucket.list_blobs() + for blob in blobs: + if blob.name.startswith("simple"): + blobs_to_delete.append(blob) + assert len(blobs_to_delete) == 1 + + +def test_download_many(listable_bucket): + blobs = list(listable_bucket.list_blobs()) + tempfiles = [tempfile.TemporaryFile(), tempfile.TemporaryFile()] + BLOB_FILE_PAIRS = zip(blobs[:2], tempfiles) + + results = transfer_manager.download_many(BLOB_FILE_PAIRS) + assert results == [None, None] + for fp in tempfiles: + assert fp.tell() != 0 + + +def test_download_chunks_concurrently_to_file( + shared_bucket, file_data, blobs_to_delete +): + blob = shared_bucket.blob("big") + blob.upload_from_filename(file_data["big"]["path"]) + blobs_to_delete.append(blob) + + blob.reload() + fp = tempfile.TemporaryFile() + result = transfer_manager.download_chunks_concurrently_to_file( + blob, fp, chunk_size=1024 * 1024 + ) + assert result is None + assert fp.tell() != 0 diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 63b09c99e..4b47fa610 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -14,6 +14,8 @@ from google.cloud.storage import transfer_manager +from google.api_core import exceptions + import io import tempfile import unittest @@ -25,7 +27,7 @@ def test_upload_many_with_filenames(self): FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] FAKE_CONTENT_TYPE = "text/fake" UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} - EXPECTED_UPLOAD_KWARGS = {"if_not_generation_match": 0, **UPLOAD_KWARGS} + EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} FAKE_RESULT = "nothing to see here" for _, blob_mock in FILE_BLOB_PAIRS: @@ -48,7 +50,7 @@ def test_upload_many_with_file_objs(self): ] FAKE_CONTENT_TYPE = "text/fake" UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} - EXPECTED_UPLOAD_KWARGS = {"if_not_generation_match": 0, **UPLOAD_KWARGS} + EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} FAKE_RESULT = "nothing to see here" for _, blob_mock in FILE_BLOB_PAIRS: @@ -97,6 +99,19 @@ def test_upload_many_raises_exceptions(self): with self.assertRaises(ConnectionError): transfer_manager.upload_many(FILE_BLOB_PAIRS, raise_exception=True) + def test_upload_many_suppresses_412_with_skip_if_exists(self): + FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = exceptions.PreconditionFailed( + "412" + ) + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, skip_if_exists=True, raise_exception=True + ) + for result in results: + self.assertEqual(type(result), exceptions.PreconditionFailed) + def test_download_many_with_filenames(self): BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] FAKE_ENCODING = "fake_gzip" From 700c9404bf27a2cd40589e54c37f85b68cf79246 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 12 Oct 2022 15:08:26 -0700 Subject: [PATCH 08/19] Respond to review comments --- google/cloud/storage/transfer_manager.py | 60 +++++++++++++++--------- tests/unit/test_transfer_manager.py | 2 +- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 6a28c89e1..d157dbcd9 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Concurrent media operations. This is a FEATURE PREVIEW: API may change.""" +"""Concurrent media operations. This is a PREVIEW FEATURE: API may change.""" import concurrent.futures @@ -21,6 +21,9 @@ from google.api_core import exceptions +DEFAULT_CHUNK_SIZE = 200 * 1024 * 1024 + + def upload_many( file_blob_pairs, skip_if_exists=False, @@ -31,7 +34,7 @@ def upload_many( ): """Upload many files concurrently via a worker pool. - This function is a FEATURE PREVIEW: the API may change in a future version. + This function is a PREVIEW FEATURE: the API may change in a future version. :type file_blob_pairs: List(Tuple(IOBase or str, 'google.cloud.storage.blob.Blob')) :param file_blob_pairs: @@ -64,7 +67,8 @@ def upload_many( :param deadline: The number of seconds to wait for all threads to resolve. If the deadline is reached, all threads will be terminated regardless of their - progress and concurrent.futures.TimeoutError will be raised. + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. :type raise_exception: bool :param raise_exception: @@ -128,7 +132,7 @@ def download_many( ): """Download many blobs concurrently via a worker pool. - This function is a FEATURE PREVIEW: the API may change in a future version. + This function is a PREVIEW FEATURE: the API may change in a future version. :type blob_file_pairs: List(Tuple('google.cloud.storage.blob.Blob', IOBase or str)) :param blob_file_pairs: @@ -153,7 +157,8 @@ def download_many( :param deadline: The number of seconds to wait for all threads to resolve. If the deadline is reached, all threads will be terminated regardless of their - progress and concurrent.futures.TimeoutError will be raised. + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. :type raise_exception: bool :param raise_exception: @@ -200,19 +205,23 @@ def download_many( def download_chunks_concurrently_to_file( blob, file_obj, - chunk_size=200 * 1024 * 1024, + chunk_size=DEFAULT_CHUNK_SIZE, download_kwargs=None, max_workers=None, deadline=None, ): """Download a single blob in chunks, concurrently. - This function is a FEATURE PREVIEW: the API may change in a future version. + This function is a PREVIEW FEATURE: the API may change in a future version. Use of this function, in cases where single threads are unable to fully saturate available network bandwidth, may improve download performance for large objects. + The size of the blob must be known in order to calculate the number of + chunks. If the size is not already set, blob.reload() will be called + automatically to set it. + :type blob: 'google.cloud.storage.blob.Blob' :param blob: The blob to download. @@ -241,6 +250,13 @@ def download_chunks_concurrently_to_file( the worker pool. Refer to concurrent.futures.ThreadPoolExecutor documentation for details. + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. """ @@ -287,7 +303,7 @@ def upload_many_from_filenames( bucket, filenames, root, - prefix="", + blob_name_prefix="", skip_if_exists=False, blob_constructor_kwargs=None, upload_kwargs=None, @@ -297,13 +313,13 @@ def upload_many_from_filenames( ): """Upload many files concurrently by their filenames. - This function is a FEATURE PREVIEW: the API may change in a future version. + This function is a PREVIEW FEATURE: the API may change in a future version. The destination blobs are automatically created, with blob names based on - the source filenames and the prefix. + the source filenames and the blob_name_prefix. For example, if the `filenames` include "images/icon.jpg", `root` is - "/home/myuser/", and `prefix` is "myfiles/", then the file at + "/home/myuser/", and `blob_name_prefix` is "myfiles/", then the file at "/home/myuser/images/icon.jpg" will be uploaded to a blob named "myfiles/images/icon.jpg". @@ -327,25 +343,25 @@ def upload_many_from_filenames( but is not required to do so. For instance, if the root string is "/tmp/img-" and a filename is - "0001.jpg", with an empty prefix, then the file uploaded will be - "/tmp/img-0001.jpg" and the destination blob will be "0001.jpg". + "0001.jpg", with an empty blob_name_prefix, then the file uploaded will + be "/tmp/img-0001.jpg" and the destination blob will be "0001.jpg". This parameter can be an empty string, but has no default because almost all use cases will use a non-empty root. - :type prefix: str - :param prefix: + :type blob_name_prefix: str + :param blob_name_prefix: A string that will be prepended to each filename in the input list, in order to determine the name of the destination blob. Unlike the filename itself, the prefix string does not affect the location the library will look for the source data on the local filesystem. - For instance, if the root is "/tmp/img-", the prefix is + For instance, if the root is "/tmp/img-", the blob_name_prefix is "myuser/mystuff-" and a filename is "0001.jpg" then the file uploaded will be "/tmp/img-0001.jpg" and the destination blob will be "myuser/mystuff-0001.jpg". - The prefix can be blank (an empty string). + The blob_name_prefix can be blank (an empty string). :type skip_if_exists: bool :param skip_if_exists: @@ -380,7 +396,8 @@ def upload_many_from_filenames( :param deadline: The number of seconds to wait for all threads to resolve. If the deadline is reached, all threads will be terminated regardless of their - progress and concurrent.futures.TimeoutError will be raised. + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. :type raise_exception: bool :param raise_exception: @@ -406,7 +423,7 @@ def upload_many_from_filenames( for filename in filenames: path = root + filename - blob_name = prefix + filename + blob_name = blob_name_prefix + filename blob = bucket.blob(blob_name, **blob_constructor_kwargs) file_blob_pairs.append((path, blob)) @@ -432,7 +449,7 @@ def download_many_to_path( ): """Download many files concurrently by their blob names. - This function is a FEATURE PREVIEW: the API may change in a future version. + This function is a PREVIEW FEATURE: the API may change in a future version. The destination files are automatically created, with filenames based on the source blob_names and the path_root. @@ -497,7 +514,8 @@ def download_many_to_path( :param deadline: The number of seconds to wait for all threads to resolve. If the deadline is reached, all threads will be terminated regardless of their - progress and concurrent.futures.TimeoutError will be raised. + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. :type raise_exception: bool :param raise_exception: diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 4b47fa610..57095a0db 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -256,7 +256,7 @@ def test_upload_many_from_filenames(self): bucket, FILENAMES, ROOT, - prefix=PREFIX, + blob_name_prefix=PREFIX, skip_if_exists=True, blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, upload_kwargs=UPLOAD_KWARGS, From 45dd36850eb045def66721b79a5bded3c5a2c109 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 21 Oct 2022 12:43:38 -0700 Subject: [PATCH 09/19] verify md5 hash of downloaded file in test --- tests/system/test_transfer_manager.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index efd0698bc..57f2464d7 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -16,6 +16,7 @@ import tempfile from google.cloud.storage import transfer_manager +from google.cloud.storage._helpers import _base64_md5hash from google.api_core import exceptions @@ -98,3 +99,6 @@ def test_download_chunks_concurrently_to_file( ) assert result is None assert fp.tell() != 0 + + fp.seek(0) + assert blob.md5_hash.encode('utf8') == _base64_md5hash(fp) From 53780ca6ee2c1dd919afcee28273ab289f7f9999 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 24 Oct 2022 10:10:58 -0700 Subject: [PATCH 10/19] lint --- tests/system/test_transfer_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index 57f2464d7..99887f0e1 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -101,4 +101,4 @@ def test_download_chunks_concurrently_to_file( assert fp.tell() != 0 fp.seek(0) - assert blob.md5_hash.encode('utf8') == _base64_md5hash(fp) + assert blob.md5_hash.encode("utf8") == _base64_md5hash(fp) From 65871af8e16e1ba7bcb842e794de5acd518c0395 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 25 Oct 2022 11:40:42 -0700 Subject: [PATCH 11/19] default empty strings for root arguments --- google/cloud/storage/transfer_manager.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index d157dbcd9..b6f769a17 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -302,7 +302,7 @@ def download_range_via_tempfile(blob, start, end, download_kwargs): def upload_many_from_filenames( bucket, filenames, - root, + root="", blob_name_prefix="", skip_if_exists=False, blob_constructor_kwargs=None, @@ -346,8 +346,7 @@ def upload_many_from_filenames( "0001.jpg", with an empty blob_name_prefix, then the file uploaded will be "/tmp/img-0001.jpg" and the destination blob will be "0001.jpg". - This parameter can be an empty string, but has no default because almost - all use cases will use a non-empty root. + This parameter can be an empty string. :type blob_name_prefix: str :param blob_name_prefix: @@ -440,7 +439,7 @@ def upload_many_from_filenames( def download_many_to_path( bucket, blob_names, - path_root, + path_root="", blob_name_prefix="", download_kwargs=None, max_workers=None, @@ -482,9 +481,7 @@ def download_many_to_path( not required to do so. For instance, if the path_root string is "/tmp/img-" and a blob_name is "0001.jpg", with an empty blob_name_prefix, then the source blob "0001.jpg" will be downloaded to - destination "/tmp/img-0001.jpg" . This parameter can be an empty string, - but has no default because almost all use cases will use a non-blank - root. + destination "/tmp/img-0001.jpg" . This parameter can be an empty string. :type blob_name_prefix: str :param blob_name_prefix: From 5e7693f055c682a5e53e37a413b18b967746f5e1 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 25 Oct 2022 11:51:45 -0700 Subject: [PATCH 12/19] fix bug with blob constructor --- google/cloud/storage/transfer_manager.py | 2 ++ tests/unit/test_transfer_manager.py | 26 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index b6f769a17..85b6a4316 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -417,6 +417,8 @@ def upload_many_from_filenames( for that operation. Otherwise, the return value from the successful upload method is used (typically, None). """ + if blob_constructor_kwargs is None: + blob_constructor_kwargs = {} file_blob_pairs = [] diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 57095a0db..b48748018 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -276,6 +276,32 @@ def test_upload_many_from_filenames(self): bucket.blob.assert_any_call(PREFIX + FILENAMES[0], **BLOB_CONSTRUCTOR_KWARGS) bucket.blob.assert_any_call(PREFIX + FILENAMES[1], **BLOB_CONSTRUCTOR_KWARGS) + def test_upload_many_from_filenames_minimal_args(self): + bucket = mock.Mock() + + FILENAMES = ["file_a.txt", "file_b.txt"] + + EXPECTED_FILE_BLOB_PAIRS = [(filename, mock.ANY) for filename in FILENAMES] + + with mock.patch( + "google.cloud.storage.transfer_manager.upload_many" + ) as mock_upload_many: + transfer_manager.upload_many_from_filenames( + bucket, + FILENAMES, + ) + + mock_upload_many.assert_called_once_with( + EXPECTED_FILE_BLOB_PAIRS, + skip_if_exists=False, + upload_kwargs=None, + max_workers=None, + deadline=None, + raise_exception=False, + ) + bucket.blob.assert_any_call(FILENAMES[0]) + bucket.blob.assert_any_call(FILENAMES[1]) + def test_download_many_to_path(self): bucket = mock.Mock() From e9ca76d6eebd7414e7a2b6d43b5e3bf46e1f4d7d Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 25 Oct 2022 13:24:18 -0700 Subject: [PATCH 13/19] add warning about files not being deleted if their downloads fail --- google/cloud/storage/transfer_manager.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 85b6a4316..162e6465d 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -140,6 +140,9 @@ def download_many( downloaded to the corresponding blob by using blob.download_to_file() or blob.download_to_filename() as appropriate. + Note that blob.download_to_filename() does not delete the destination + file if the download fails. + :type download_kwargs: dict :param download_kwargs: A dictionary of keyword arguments to pass to the download method. Refer @@ -455,6 +458,10 @@ def download_many_to_path( The destination files are automatically created, with filenames based on the source blob_names and the path_root. + The destination files are not automatically deleted if their downloads fail, + so please check the return value of this function for any exceptions, or + enable `raise_exception=True`, and process the files accordingly. + For example, if the `blob_names` include "icon.jpg", `path_root` is "/home/myuser/", and `blob_name_prefix` is "images/", then the blob named "images/icon.jpg" will be downloaded to a file named From a7ecd857ea9905fb19db127096359850a2b727d1 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 6 Dec 2022 11:03:01 -0800 Subject: [PATCH 14/19] docs: Add samples to multithread branch (#918) * add samples, tests pending * add snippet tests * snippet and snippets_test.py linting * snippets; recursive directory creation; rename some params * Add directory upload snippet --- google/cloud/storage/transfer_manager.py | 108 ++++++---- samples/snippets/snippets_test.py | 135 ++++++++++-- samples/snippets/storage_transfer_manager.py | 208 +++++++++++++++++++ tests/unit/test_transfer_manager.py | 59 +++++- 4 files changed, 443 insertions(+), 67 deletions(-) create mode 100644 samples/snippets/storage_transfer_manager.py diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 162e6465d..dd2519229 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -16,10 +16,17 @@ import concurrent.futures +import os import tempfile +import warnings from google.api_core import exceptions +warnings.warn( + "The module `transfer_manager` is a preview feature. Functionality and API " + "may change. This warning will be removed in a future release." +) + DEFAULT_CHUNK_SIZE = 200 * 1024 * 1024 @@ -305,7 +312,7 @@ def download_range_via_tempfile(blob, start, end, download_kwargs): def upload_many_from_filenames( bucket, filenames, - root="", + source_directory="", blob_name_prefix="", skip_if_exists=False, blob_constructor_kwargs=None, @@ -321,10 +328,10 @@ def upload_many_from_filenames( The destination blobs are automatically created, with blob names based on the source filenames and the blob_name_prefix. - For example, if the `filenames` include "images/icon.jpg", `root` is - "/home/myuser/", and `blob_name_prefix` is "myfiles/", then the file at - "/home/myuser/images/icon.jpg" will be uploaded to a blob named - "myfiles/images/icon.jpg". + For example, if the `filenames` include "images/icon.jpg", + `source_directory` is "/home/myuser/", and `blob_name_prefix` is "myfiles/", + then the file at "/home/myuser/images/icon.jpg" will be uploaded to a blob + named "myfiles/images/icon.jpg". :type bucket: 'google.cloud.storage.bucket.Bucket' :param bucket: @@ -333,24 +340,24 @@ def upload_many_from_filenames( :type filenames: list(str) :param filenames: A list of filenames to be uploaded. This may include part of the path. - The full path to the file must be root + filename. The filename is - separate from the root because the filename will also determine the - name of the destination blob. + The full path to the file must be source_directory + filename. - :type root: str - :param root: - A string that will be prepended to each filename in the input list, in - order to find the source file for each blob. Unlike the filename itself, - the root string does not affect the name of the uploaded blob itself. - The root string will usually end in "/" (or "\\" depending on platform) - but is not required to do so. + :type source_directory: str + :param source_directory: + A string that will be prepended (with os.path.join()) to each filename + in the input list, in order to find the source file for each blob. + Unlike the filename itself, the source_directory does not affect the + name of the uploaded blob. - For instance, if the root string is "/tmp/img-" and a filename is + For instance, if the source_directory is "/tmp/img/" and a filename is "0001.jpg", with an empty blob_name_prefix, then the file uploaded will - be "/tmp/img-0001.jpg" and the destination blob will be "0001.jpg". + be "/tmp/img/0001.jpg" and the destination blob will be "0001.jpg". This parameter can be an empty string. + Note that this parameter allows directory traversal (e.g. "/", "../") + and is not intended for unsanitized end user input. + :type blob_name_prefix: str :param blob_name_prefix: A string that will be prepended to each filename in the input list, in @@ -358,10 +365,10 @@ def upload_many_from_filenames( itself, the prefix string does not affect the location the library will look for the source data on the local filesystem. - For instance, if the root is "/tmp/img-", the blob_name_prefix is - "myuser/mystuff-" and a filename is "0001.jpg" then the file uploaded - will be "/tmp/img-0001.jpg" and the destination blob will be - "myuser/mystuff-0001.jpg". + For instance, if the source_directory is "/tmp/img/", the + blob_name_prefix is "myuser/mystuff-" and a filename is "0001.jpg" then + the file uploaded will be "/tmp/img/0001.jpg" and the destination blob + will be "myuser/mystuff-0001.jpg". The blob_name_prefix can be blank (an empty string). @@ -370,7 +377,7 @@ def upload_many_from_filenames( If True, blobs that already have a live version will not be overwritten. This is accomplished by setting "if_generation_match = 0" on uploads. Uploads so skipped will result in a 412 Precondition Failed response - code, which will be included in the return value but not raised + code, which will be included in the return value, but not raised as an exception regardless of the value of raise_exception. :type blob_constructor_kwargs: dict @@ -426,7 +433,7 @@ def upload_many_from_filenames( file_blob_pairs = [] for filename in filenames: - path = root + filename + path = os.path.join(source_directory, filename) blob_name = blob_name_prefix + filename blob = bucket.blob(blob_name, **blob_constructor_kwargs) file_blob_pairs.append((path, blob)) @@ -444,26 +451,27 @@ def upload_many_from_filenames( def download_many_to_path( bucket, blob_names, - path_root="", + destination_directory="", blob_name_prefix="", download_kwargs=None, max_workers=None, deadline=None, + create_directories=True, raise_exception=False, ): """Download many files concurrently by their blob names. This function is a PREVIEW FEATURE: the API may change in a future version. - The destination files are automatically created, with filenames based on - the source blob_names and the path_root. + The destination files are automatically created, with paths based on the + source blob_names and the destination_directory. The destination files are not automatically deleted if their downloads fail, so please check the return value of this function for any exceptions, or enable `raise_exception=True`, and process the files accordingly. - For example, if the `blob_names` include "icon.jpg", `path_root` is - "/home/myuser/", and `blob_name_prefix` is "images/", then the blob named + For example, if the `blob_names` include "icon.jpg", `destination_directory` + is "/home/myuser/", and `blob_name_prefix` is "images/", then the blob named "images/icon.jpg" will be downloaded to a file named "/home/myuser/icon.jpg". @@ -482,26 +490,31 @@ def download_many_to_path( the blob names that need not be part of the destination path should be included in the blob_name_prefix. - :type path_root: str - :param path_root: - A string that will be prepended to each blob_name in the input list, - in order to determine the destination path for that blob. The path_root - string will usually end in "/" (or "\\" depending on platform) but is - not required to do so. For instance, if the path_root string is - "/tmp/img-" and a blob_name is "0001.jpg", with an empty - blob_name_prefix, then the source blob "0001.jpg" will be downloaded to - destination "/tmp/img-0001.jpg" . This parameter can be an empty string. + :type destination_directory: str + :param destination_directory: + A string that will be prepended (with os.path.join()) to each blob_name + in the input list, in order to determine the destination path for that + blob. + + For instance, if the destination_directory string is "/tmp/img" and a + blob_name is "0001.jpg", with an empty blob_name_prefix, then the source + blob "0001.jpg" will be downloaded to destination "/tmp/img/0001.jpg" . + + This parameter can be an empty string. + + Note that this parameter allows directory traversal (e.g. "/", "../") + and is not intended for unsanitized end user input. :type blob_name_prefix: str :param blob_name_prefix: A string that will be prepended to each blob_name in the input list, in order to determine the name of the source blob. Unlike the blob_name itself, the prefix string does not affect the destination path on the - local filesystem. For instance, if the path_root is "/tmp/img-", the - blob_name_prefix is "myuser/mystuff-" and a blob_name is "0001.jpg" then - the source blob "myuser/mystuff-0001.jpg" will be downloaded to - "/tmp/img-0001.jpg". The blob_name_prefix can be blank (an empty - string). + local filesystem. For instance, if the destination_directory is + "/tmp/img/", the blob_name_prefix is "myuser/mystuff-" and a blob_name + is "0001.jpg" then the source blob "myuser/mystuff-0001.jpg" will be + downloaded to "/tmp/img/0001.jpg". The blob_name_prefix can be blank + (an empty string). :type download_kwargs: dict :param download_kwargs: @@ -523,6 +536,12 @@ def download_many_to_path( progress and concurrent.futures.TimeoutError will be raised. This can be left as the default of None (no deadline) for most use cases. + :type create_directories: bool + :param create_directories: + If True, recursively create any directories that do not exist. For + instance, if downloading object "images/img001.png", create the + directory "images" before downloading. + :type raise_exception: bool :param raise_exception: If True, instead of adding exceptions to the list of return values, @@ -545,7 +564,10 @@ def download_many_to_path( for blob_name in blob_names: full_blob_name = blob_name_prefix + blob_name - path = path_root + blob_name + path = os.path.join(destination_directory, blob_name) + if create_directories: + directory, _ = os.path.split(path) + os.makedirs(directory, exist_ok=True) blob_file_pairs.append((bucket.blob(full_blob_name), path)) return download_many( diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 9370ecbdd..83d34d918 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -72,6 +72,7 @@ import storage_set_bucket_default_kms_key import storage_set_client_endpoint import storage_set_metadata +import storage_transfer_manager import storage_upload_file import storage_upload_from_memory import storage_upload_from_stream @@ -124,8 +125,8 @@ def test_bucket(): def test_public_bucket(): # The new projects don't allow to make a bucket available to public, so # for some tests we need to use the old main project for now. - original_value = os.environ['GOOGLE_CLOUD_PROJECT'] - os.environ['GOOGLE_CLOUD_PROJECT'] = os.environ['MAIN_GOOGLE_CLOUD_PROJECT'] + original_value = os.environ["GOOGLE_CLOUD_PROJECT"] + os.environ["GOOGLE_CLOUD_PROJECT"] = os.environ["MAIN_GOOGLE_CLOUD_PROJECT"] bucket = None while bucket is None or bucket.exists(): storage_client = storage.Client() @@ -135,7 +136,7 @@ def test_public_bucket(): yield bucket bucket.delete(force=True) # Set the value back. - os.environ['GOOGLE_CLOUD_PROJECT'] = original_value + os.environ["GOOGLE_CLOUD_PROJECT"] = original_value @pytest.fixture(scope="module") @@ -255,7 +256,7 @@ def test_download_byte_range(test_blob): storage_download_byte_range.download_byte_range( test_blob.bucket.name, test_blob.name, 0, 4, dest_file.name ) - assert dest_file.read() == b'Hello' + assert dest_file.read() == b"Hello" def test_download_blob(test_blob): @@ -308,7 +309,8 @@ def test_delete_blob(test_blob): def test_make_blob_public(test_public_blob): storage_make_public.make_blob_public( - test_public_blob.bucket.name, test_public_blob.name) + test_public_blob.bucket.name, test_public_blob.name + ) r = requests.get(test_public_blob.public_url) assert r.text == "Hello, is it me you're looking for?" @@ -340,7 +342,9 @@ def test_generate_upload_signed_url_v4(test_bucket, capsys): ) requests.put( - url, data=content, headers={"content-type": "application/octet-stream"}, + url, + data=content, + headers={"content-type": "application/octet-stream"}, ) bucket = storage.Client().bucket(test_bucket.name) @@ -447,16 +451,20 @@ def test_get_set_autoclass(new_bucket_obj, test_bucket, capsys): def test_bucket_lifecycle_management(test_bucket, capsys): - bucket = storage_enable_bucket_lifecycle_management.enable_bucket_lifecycle_management( - test_bucket + bucket = ( + storage_enable_bucket_lifecycle_management.enable_bucket_lifecycle_management( + test_bucket + ) ) out, _ = capsys.readouterr() assert "[]" in out assert "Lifecycle management is enable" in out assert len(list(bucket.lifecycle_rules)) > 0 - bucket = storage_disable_bucket_lifecycle_management.disable_bucket_lifecycle_management( - test_bucket + bucket = ( + storage_disable_bucket_lifecycle_management.disable_bucket_lifecycle_management( + test_bucket + ) ) out, _ = capsys.readouterr() assert "[]" in out @@ -512,7 +520,8 @@ def test_get_service_account(capsys): def test_download_public_file(test_public_blob): storage_make_public.make_blob_public( - test_public_blob.bucket.name, test_public_blob.name) + test_public_blob.bucket.name, test_public_blob.name + ) with tempfile.NamedTemporaryFile() as dest_file: storage_download_public_file.download_public_file( test_public_blob.bucket.name, test_public_blob.name, dest_file.name @@ -522,8 +531,10 @@ def test_download_public_file(test_public_blob): def test_define_bucket_website_configuration(test_bucket): - bucket = storage_define_bucket_website_configuration.define_bucket_website_configuration( - test_bucket.name, "index.html", "404.html" + bucket = ( + storage_define_bucket_website_configuration.define_bucket_website_configuration( + test_bucket.name, "index.html", "404.html" + ) ) website_val = {"mainPageSuffix": "index.html", "notFoundPage": "404.html"} @@ -586,7 +597,7 @@ def test_change_default_storage_class(test_bucket, capsys): ) out, _ = capsys.readouterr() assert "Default storage class for bucket" in out - assert bucket.storage_class == 'COLDLINE' + assert bucket.storage_class == "COLDLINE" def test_change_file_storage_class(test_blob, capsys): @@ -595,7 +606,7 @@ def test_change_file_storage_class(test_blob, capsys): ) out, _ = capsys.readouterr() assert f"Blob {blob.name} in bucket {blob.bucket.name}" in out - assert blob.storage_class == 'NEARLINE' + assert blob.storage_class == "NEARLINE" def test_copy_file_archived_generation(test_blob): @@ -647,7 +658,99 @@ def test_batch_request(test_bucket): def test_storage_set_client_endpoint(capsys): - storage_set_client_endpoint.set_client_endpoint('https://storage.googleapis.com') + storage_set_client_endpoint.set_client_endpoint("https://storage.googleapis.com") out, _ = capsys.readouterr() assert "client initiated with endpoint: https://storage.googleapis.com" in out + + +def test_transfer_manager_snippets(test_bucket, capsys): + BLOB_NAMES = [ + "test.txt", + "test2.txt", + "blobs/test.txt", + "blobs/nesteddir/test.txt", + ] + BIG_BLOB_NAME = "bigblob.txt" + ALL_NAMES = BLOB_NAMES + [BIG_BLOB_NAME] + TEST_DATA_24_BYTES = b"I am a rather big blob! " + SIZE_MULTIPLIER = 1024 + + with tempfile.TemporaryDirectory() as uploads: + # Create dirs and nested dirs + for name in BLOB_NAMES: + relpath = os.path.dirname(name) + os.makedirs(os.path.join(uploads, relpath), exist_ok=True) + + # Create files with nested dirs to exercise directory handling. + for name in BLOB_NAMES: + with open(os.path.join(uploads, name), "w") as f: + f.write(name) + # Also create one somewhat bigger file. + with open(os.path.join(uploads, BIG_BLOB_NAME), "wb") as f: + f.write(TEST_DATA_24_BYTES * SIZE_MULTIPLIER) + + storage_transfer_manager.upload_many_blobs_with_transfer_manager( + test_bucket.name, ALL_NAMES, source_directory="{}/".format(uploads) + ) + out, _ = capsys.readouterr() + + for name in ALL_NAMES: + assert "Uploaded {}".format(name) in out + + with tempfile.TemporaryDirectory() as downloads: + # First let's download the bigger file in chunks. + big_destination_path = os.path.join(downloads, "chunkeddl.txt") + storage_transfer_manager.download_blob_chunks_concurrently_with_transfer_manager( + test_bucket.name, + BIG_BLOB_NAME, + big_destination_path, + chunk_size=SIZE_MULTIPLIER, + ) + out, _ = capsys.readouterr() + + assert ( + "Downloaded {} to {} in {} chunk(s).".format( + BIG_BLOB_NAME, big_destination_path, len(TEST_DATA_24_BYTES) + ) + in out + ) + + # Now all the smaller files, plus the big file again because it's + # still in the bucket. + storage_transfer_manager.download_all_blobs_with_transfer_manager( + test_bucket.name, destination_directory=os.path.join(downloads, "") + ) + out, _ = capsys.readouterr() + + for name in ALL_NAMES: + assert "Downloaded {}".format(name) in out + + +def test_transfer_manager_directory_upload(test_bucket, capsys): + BLOB_NAMES = [ + "dirtest/test.txt", + "dirtest/test2.txt", + "dirtest/blobs/test.txt", + "dirtest/blobs/nesteddir/test.txt", + ] + + with tempfile.TemporaryDirectory() as uploads: + # Create dirs and nested dirs + for name in BLOB_NAMES: + relpath = os.path.dirname(name) + os.makedirs(os.path.join(uploads, relpath), exist_ok=True) + + # Create files with nested dirs to exercise directory handling. + for name in BLOB_NAMES: + with open(os.path.join(uploads, name), "w") as f: + f.write(name) + + storage_transfer_manager.upload_directory_with_transfer_manager( + test_bucket.name, directory="{}/".format(uploads) + ) + out, _ = capsys.readouterr() + + assert "Found {}".format(len(BLOB_NAMES)) in out + for name in BLOB_NAMES: + assert "Uploaded {}".format(name) in out diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py new file mode 100644 index 000000000..34502711d --- /dev/null +++ b/samples/snippets/storage_transfer_manager.py @@ -0,0 +1,208 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def upload_many_blobs_with_transfer_manager( + bucket_name, filenames, source_directory="" +): + """Upload every file in a list to a bucket, concurrently in a thread pool. + + Each blob name is derived from the filename, not including the + `source_directory` parameter. For complete control of the blob name for each + file (and other aspects of individual blob metadata), use + transfer_manager.upload_many() instead. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # A list (or other iterable) of filenames to upload. + # filenames = ["file_1.txt", "file_2.txt"] + + # The directory on your computer that is the root of all of the files in the + # list of filenames. This string is prepended (with os.path.join()) to each + # filename to get the full path to the file. Relative paths and absolute + # paths are both accepted. This string is not included in the name of the + # uploaded blob; it is only used to find the source files. An empty string + # means "the current working directory". Note that this parameter allows + # directory traversal (e.g. "/", "../") and is not intended for unsanitized + # end user input. + # source_directory="" + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + results = transfer_manager.upload_many_from_filenames( + bucket, filenames, source_directory=source_directory + ) + + for name, result in zip(filenames, results): + # The results list is either `None` or an exception for each filename in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to upload {} due to exception: {}".format(name, result)) + else: + print("Uploaded {} to {}.".format(name, bucket.name)) + + +def upload_directory_with_transfer_manager(bucket_name, directory): + """Upload every file in a directory, including all files in subdirectories. + + Each blob name is derived from the filename, not including the `directory` + parameter itself. For complete control of the blob name for each file (and + other aspects of individual blob metadata), use + transfer_manager.upload_many() instead. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The directory on your computer to upload. Files in the directory and its + # subdirectories will be uploaded. An empty string means "the current + # working directory". + # directory="" + + from pathlib import Path + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + # Generate a list of paths (in string form) relative to the `directory`. + # This can be done in a single list comprehension, but is expanded into + # multiple lines here for clarity. + + # First, recursively get all files in `directory` as Path objects. + directory_as_path_obj = Path(directory) + paths = directory_as_path_obj.rglob("*") + + # Filter so the list only includes files, not directories themselves. + file_paths = [path for path in paths if path.is_file()] + + # These paths are relative to the current working directory. Next, make them + # relative to `directory` + relative_paths = [path.relative_to(directory) for path in file_paths] + + # Finally, convert them all to strings. + string_paths = [str(path) for path in relative_paths] + + print("Found {} files.".format(len(string_paths))) + + # Start the upload. + results = transfer_manager.upload_many_from_filenames( + bucket, string_paths, source_directory=directory + ) + + for name, result in zip(string_paths, results): + # The results list is either `None` or an exception for each filename in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to upload {} due to exception: {}".format(name, result)) + else: + print("Uploaded {} to {}.".format(name, bucket.name)) + + +def download_all_blobs_with_transfer_manager(bucket_name, destination_directory=""): + """Download all of the blobs in a bucket, concurrently in a thread pool. + + The filename of each blob once downloaded is derived from the blob name and + the `destination_directory `parameter. For complete control of the filename + of each blob, use transfer_manager.download_many() instead. + + Directories will be created automatically as needed, for instance to + accommodate blob names that include slashes. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The directory on your computer to which to download all of the files. This + # string is prepended (with os.path.join()) to the name of each blob to form + # the full path. Relative paths and absolute paths are both accepted. An + # empty string means "the current working directory". Note that this + # parameter allows accepts directory traversal ("../" etc.) and is not + # intended for unsanitized end user input. + # destination_directory = "" + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + blob_names = [blob.name for blob in bucket.list_blobs()] + + results = transfer_manager.download_many_to_path( + bucket, blob_names, destination_directory=destination_directory + ) + + for name, result in zip(blob_names, results): + # The results list is either `None` or an exception for each blob in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to download {} due to exception: {}".format(name, result)) + else: + print("Downloaded {} to {}.".format(name, destination_directory + name)) + + +def download_blob_chunks_concurrently_with_transfer_manager( + bucket_name, blob_name, local_filename, chunk_size=200 * 1024 * 1024 +): + """Download a single blob, in chunks, concurrently in a thread pool. + + This is intended for use with very large blobs.""" + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The name of the blob to download. + # blob_name = "your-blob.txt" + + # The filename (or path) on your computer to which to download the blob. + # local_filename = "your-file.txt" + + # The size of each chunk. For instance, if the chunk size is 200 megabytes, + # a 1.6 gigabyte file would be downloaded in eight pieces concurrently. + # chunk_size = 200 * 1024 * 1024 # 200 MiB. + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + blob = bucket.blob(blob_name) + + # Open the local file in binary write mode. + local_file = open(local_filename, "wb") + + # Unlike other transfer manager functions, which handle multiple files and + # return exceptions in a list, this function will simply raise any exception + # it encounters, and has no return value. + transfer_manager.download_chunks_concurrently_to_file( + blob, local_file, chunk_size=chunk_size + ) + + # If we've gotten this far, it must have been successful. + + number_of_chunks = -(blob.size // -chunk_size) # Ceiling division + print( + "Downloaded {} to {} in {} chunk(s).".format( + blob_name, local_file.name, number_of_chunks + ) + ) diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index b48748018..9f90337a5 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from google.cloud.storage import transfer_manager +import pytest + +with pytest.warns(UserWarning): + from google.cloud.storage import transfer_manager from google.api_core import exceptions +import os import io import tempfile import unittest @@ -246,7 +250,7 @@ def test_upload_many_from_filenames(self): DEADLINE = 10 EXPECTED_FILE_BLOB_PAIRS = [ - (ROOT + filename, mock.ANY) for filename in FILENAMES + (os.path.join(ROOT, filename), mock.ANY) for filename in FILENAMES ] with mock.patch( @@ -255,7 +259,7 @@ def test_upload_many_from_filenames(self): transfer_manager.upload_many_from_filenames( bucket, FILENAMES, - ROOT, + source_directory=ROOT, blob_name_prefix=PREFIX, skip_if_exists=True, blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, @@ -305,7 +309,7 @@ def test_upload_many_from_filenames_minimal_args(self): def test_download_many_to_path(self): bucket = mock.Mock() - BLOBNAMES = ["file_a.txt", "file_b.txt"] + BLOBNAMES = ["file_a.txt", "file_b.txt", "dir_a/file_c.txt"] PATH_ROOT = "mypath/" BLOB_NAME_PREFIX = "myprefix/" DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} @@ -313,7 +317,7 @@ def test_download_many_to_path(self): DEADLINE = 10 EXPECTED_BLOB_FILE_PAIRS = [ - (mock.ANY, PATH_ROOT + blobname) for blobname in BLOBNAMES + (mock.ANY, os.path.join(PATH_ROOT, blobname)) for blobname in BLOBNAMES ] with mock.patch( @@ -322,11 +326,12 @@ def test_download_many_to_path(self): transfer_manager.download_many_to_path( bucket, BLOBNAMES, - PATH_ROOT, + destination_directory=PATH_ROOT, blob_name_prefix=BLOB_NAME_PREFIX, download_kwargs=DOWNLOAD_KWARGS, max_workers=MAX_WORKERS, deadline=DEADLINE, + create_directories=False, raise_exception=True, ) @@ -337,5 +342,43 @@ def test_download_many_to_path(self): deadline=DEADLINE, raise_exception=True, ) - bucket.blob.assert_any_call(BLOB_NAME_PREFIX + BLOBNAMES[0]) - bucket.blob.assert_any_call(BLOB_NAME_PREFIX + BLOBNAMES[1]) + for blobname in BLOBNAMES: + bucket.blob.assert_any_call(BLOB_NAME_PREFIX + blobname) + + def test_download_many_to_path_creates_directories(self): + bucket = mock.Mock() + + with tempfile.TemporaryDirectory() as tempdir: + DIR_NAME = "dir_a/dir_b" + BLOBNAMES = [ + "file_a.txt", + "file_b.txt", + os.path.join(DIR_NAME, "file_c.txt"), + ] + + EXPECTED_BLOB_FILE_PAIRS = [ + (mock.ANY, os.path.join(tempdir, blobname)) for blobname in BLOBNAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.download_many" + ) as mock_download_many: + transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=tempdir, + create_directories=True, + raise_exception=True, + ) + + mock_download_many.assert_called_once_with( + EXPECTED_BLOB_FILE_PAIRS, + download_kwargs=None, + max_workers=None, + deadline=None, + raise_exception=True, + ) + for blobname in BLOBNAMES: + bucket.blob.assert_any_call(blobname) + + assert os.path.isdir(os.path.join(tempdir, DIR_NAME)) From d6b44057d09b139e1a75bc032883017555557cda Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 6 Dec 2022 11:28:06 -0800 Subject: [PATCH 15/19] fix: remove chunked downloads; change max_workers to threads --- google/cloud/storage/transfer_manager.py | 174 +++++-------------- samples/snippets/snippets_test.py | 31 +--- samples/snippets/storage_transfer_manager.py | 47 ----- tests/system/test_transfer_manager.py | 20 --- tests/unit/test_transfer_manager.py | 65 +------ 5 files changed, 60 insertions(+), 277 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index dd2519229..e87f0cc76 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -17,7 +17,6 @@ import concurrent.futures import os -import tempfile import warnings from google.api_core import exceptions @@ -35,7 +34,7 @@ def upload_many( file_blob_pairs, skip_if_exists=False, upload_kwargs=None, - max_workers=None, + threads=4, deadline=None, raise_exception=False, ): @@ -64,11 +63,16 @@ def upload_many( blob.upload_from_filename() for more information. The dict is directly passed into the upload methods and is not validated by this function. - :type max_workers: int - :param max_workers: - The number of workers (effectively, the number of threads) to use in - the worker pool. Refer to concurrent.futures.ThreadPoolExecutor - documentation for details. + :type threads: int + :param threads: + The number of threads to use in the worker pool. This is passed to + `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer + to standard library documentation for details. + + The performance impact of this value depends on the use case, but + generally, smaller files benefit from more threads and larger files + don't benefit from more threads. Too many threads can slow operations, + especially with large files, due to contention over the Python GIL. :type deadline: int :param deadline: @@ -101,7 +105,7 @@ def upload_many( if skip_if_exists: upload_kwargs["if_generation_match"] = 0 - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: futures = [] for path_or_file, blob in file_blob_pairs: method = ( @@ -133,7 +137,7 @@ def upload_many( def download_many( blob_file_pairs, download_kwargs=None, - max_workers=None, + threads=4, deadline=None, raise_exception=False, ): @@ -157,11 +161,16 @@ def download_many( blob.download_to_filename() for more information. The dict is directly passed into the download methods and is not validated by this function. - :type max_workers: int - :param max_workers: - The number of workers (effectively, the number of threads) to use in - the worker pool. Refer to concurrent.futures.ThreadPoolExecutor - documentation for details. + :type threads: int + :param threads: + The number of threads to use in the worker pool. This is passed to + `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer + to standard library documentation for details. + + The performance impact of this value depends on the use case, but + generally, smaller files benefit from more threads and larger files + don't benefit from more threads. Too many threads can slow operations, + especially with large files, due to contention over the Python GIL. :type deadline: int :param deadline: @@ -189,7 +198,7 @@ def download_many( if download_kwargs is None: download_kwargs = {} - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: futures = [] for blob, path_or_file in blob_file_pairs: method = ( @@ -212,103 +221,6 @@ def download_many( return results -def download_chunks_concurrently_to_file( - blob, - file_obj, - chunk_size=DEFAULT_CHUNK_SIZE, - download_kwargs=None, - max_workers=None, - deadline=None, -): - """Download a single blob in chunks, concurrently. - - This function is a PREVIEW FEATURE: the API may change in a future version. - - Use of this function, in cases where single threads are unable to fully - saturate available network bandwidth, may improve download performance for - large objects. - - The size of the blob must be known in order to calculate the number of - chunks. If the size is not already set, blob.reload() will be called - automatically to set it. - - :type blob: 'google.cloud.storage.blob.Blob' - :param blob: - The blob to download. - - :type file_obj: IOBase - :param file_obj: The file object to which the downloaded chunks will be - written. Chunks are written in order. While the current implementation - of this function does not use seek(), a future version may use seek() to - write chunks out of order to improve write performance. - - :type chunk_size: int - :param chunk_size: The size of each chunk. An excessively small size may - have a negative performance impact, as each chunk will be uploaded in a - separate HTTP request. - - :type download_kwargs: dict - :param download_kwargs: - A dictionary of keyword arguments to pass to the download method. Refer - to the documentation for blob.download_to_file() or - blob.download_to_filename() for more information. The dict is directly - passed into the download methods and is not validated by this function. - - :type max_workers: int - :param max_workers: - The number of workers (effectively, the number of threads) to use in - the worker pool. Refer to concurrent.futures.ThreadPoolExecutor - documentation for details. - - :type deadline: int - :param deadline: - The number of seconds to wait for all threads to resolve. If the - deadline is reached, all threads will be terminated regardless of their - progress and concurrent.futures.TimeoutError will be raised. This can be - left as the default of None (no deadline) for most use cases. - - :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. - """ - - if download_kwargs is None: - download_kwargs = {} - # We must know the size of the object, and the generation. - if not blob.size or not blob.generation: - blob.reload() - - def download_range_via_tempfile(blob, start, end, download_kwargs): - tmp = tempfile.TemporaryFile() - blob.download_to_file(tmp, start=start, end=end, **download_kwargs) - return tmp - - futures = [] - - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - cursor = 0 - while cursor < blob.size: - start = cursor - cursor = min(cursor + chunk_size, blob.size) - futures.append( - executor.submit( - download_range_via_tempfile, - blob, - start=start, - end=cursor - 1, - download_kwargs=download_kwargs, - ) - ) - - # Wait until all futures are done and process them in order. - concurrent.futures.wait( - futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED - ) - for future in futures: - tmp = future.result() - tmp.seek(0) - file_obj.write(tmp.read()) - tmp.close() - - def upload_many_from_filenames( bucket, filenames, @@ -317,7 +229,7 @@ def upload_many_from_filenames( skip_if_exists=False, blob_constructor_kwargs=None, upload_kwargs=None, - max_workers=None, + threads=4, deadline=None, raise_exception=False, ): @@ -395,11 +307,16 @@ def upload_many_from_filenames( blob.upload_from_filename() for more information. The dict is directly passed into the upload methods and is not validated by this function. - :type max_workers: int - :param max_workers: - The number of workers (effectively, the number of threads) to use in - the worker pool. Refer to concurrent.futures.ThreadPoolExecutor - documentation for details. + :type threads: int + :param threads: + The number of threads to use in the worker pool. This is passed to + `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer + to standard library documentation for details. + + The performance impact of this value depends on the use case, but + generally, smaller files benefit from more threads and larger files + don't benefit from more threads. Too many threads can slow operations, + especially with large files, due to contention over the Python GIL. :type deadline: int :param deadline: @@ -442,7 +359,7 @@ def upload_many_from_filenames( file_blob_pairs, skip_if_exists=skip_if_exists, upload_kwargs=upload_kwargs, - max_workers=max_workers, + threads=threads, deadline=deadline, raise_exception=raise_exception, ) @@ -454,7 +371,7 @@ def download_many_to_path( destination_directory="", blob_name_prefix="", download_kwargs=None, - max_workers=None, + threads=4, deadline=None, create_directories=True, raise_exception=False, @@ -523,11 +440,16 @@ def download_many_to_path( blob.download_to_filename() for more information. The dict is directly passed into the download methods and is not validated by this function. - :type max_workers: int - :param max_workers: - The number of workers (effectively, the number of threads) to use in - the worker pool. Refer to concurrent.futures.ThreadPoolExecutor - documentation for details. + :type threads: int + :param threads: + The number of threads to use in the worker pool. This is passed to + `concurrent.futures.ThreadPoolExecutor` as the `max_worker` param; refer + to standard library documentation for details. + + The performance impact of this value depends on the use case, but + generally, smaller files benefit from more threads and larger files + don't benefit from more threads. Too many threads can slow operations, + especially with large files, due to contention over the Python GIL. :type deadline: int :param deadline: @@ -573,7 +495,7 @@ def download_many_to_path( return download_many( blob_file_pairs, download_kwargs=download_kwargs, - max_workers=max_workers, + threads=threads, deadline=deadline, raise_exception=raise_exception, ) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 83d34d918..aba90ee1f 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -671,8 +671,6 @@ def test_transfer_manager_snippets(test_bucket, capsys): "blobs/test.txt", "blobs/nesteddir/test.txt", ] - BIG_BLOB_NAME = "bigblob.txt" - ALL_NAMES = BLOB_NAMES + [BIG_BLOB_NAME] TEST_DATA_24_BYTES = b"I am a rather big blob! " SIZE_MULTIPLIER = 1024 @@ -686,44 +684,23 @@ def test_transfer_manager_snippets(test_bucket, capsys): for name in BLOB_NAMES: with open(os.path.join(uploads, name), "w") as f: f.write(name) - # Also create one somewhat bigger file. - with open(os.path.join(uploads, BIG_BLOB_NAME), "wb") as f: - f.write(TEST_DATA_24_BYTES * SIZE_MULTIPLIER) storage_transfer_manager.upload_many_blobs_with_transfer_manager( - test_bucket.name, ALL_NAMES, source_directory="{}/".format(uploads) + test_bucket.name, BLOB_NAMES, source_directory="{}/".format(uploads) ) out, _ = capsys.readouterr() - for name in ALL_NAMES: + for name in BLOB_NAMES: assert "Uploaded {}".format(name) in out with tempfile.TemporaryDirectory() as downloads: - # First let's download the bigger file in chunks. - big_destination_path = os.path.join(downloads, "chunkeddl.txt") - storage_transfer_manager.download_blob_chunks_concurrently_with_transfer_manager( - test_bucket.name, - BIG_BLOB_NAME, - big_destination_path, - chunk_size=SIZE_MULTIPLIER, - ) - out, _ = capsys.readouterr() - - assert ( - "Downloaded {} to {} in {} chunk(s).".format( - BIG_BLOB_NAME, big_destination_path, len(TEST_DATA_24_BYTES) - ) - in out - ) - - # Now all the smaller files, plus the big file again because it's - # still in the bucket. + # Download the files. storage_transfer_manager.download_all_blobs_with_transfer_manager( test_bucket.name, destination_directory=os.path.join(downloads, "") ) out, _ = capsys.readouterr() - for name in ALL_NAMES: + for name in BLOB_NAMES: assert "Downloaded {}".format(name) in out diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py index 34502711d..e5fc55df0 100644 --- a/samples/snippets/storage_transfer_manager.py +++ b/samples/snippets/storage_transfer_manager.py @@ -159,50 +159,3 @@ def download_all_blobs_with_transfer_manager(bucket_name, destination_directory= print("Failed to download {} due to exception: {}".format(name, result)) else: print("Downloaded {} to {}.".format(name, destination_directory + name)) - - -def download_blob_chunks_concurrently_with_transfer_manager( - bucket_name, blob_name, local_filename, chunk_size=200 * 1024 * 1024 -): - """Download a single blob, in chunks, concurrently in a thread pool. - - This is intended for use with very large blobs.""" - - # The ID of your GCS bucket - # bucket_name = "your-bucket-name" - - # The name of the blob to download. - # blob_name = "your-blob.txt" - - # The filename (or path) on your computer to which to download the blob. - # local_filename = "your-file.txt" - - # The size of each chunk. For instance, if the chunk size is 200 megabytes, - # a 1.6 gigabyte file would be downloaded in eight pieces concurrently. - # chunk_size = 200 * 1024 * 1024 # 200 MiB. - - from google.cloud.storage import Client, transfer_manager - - storage_client = Client() - bucket = storage_client.bucket(bucket_name) - - blob = bucket.blob(blob_name) - - # Open the local file in binary write mode. - local_file = open(local_filename, "wb") - - # Unlike other transfer manager functions, which handle multiple files and - # return exceptions in a list, this function will simply raise any exception - # it encounters, and has no return value. - transfer_manager.download_chunks_concurrently_to_file( - blob, local_file, chunk_size=chunk_size - ) - - # If we've gotten this far, it must have been successful. - - number_of_chunks = -(blob.size // -chunk_size) # Ceiling division - print( - "Downloaded {} to {} in {} chunk(s).".format( - blob_name, local_file.name, number_of_chunks - ) - ) diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index 99887f0e1..0b639170d 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -16,7 +16,6 @@ import tempfile from google.cloud.storage import transfer_manager -from google.cloud.storage._helpers import _base64_md5hash from google.api_core import exceptions @@ -83,22 +82,3 @@ def test_download_many(listable_bucket): assert results == [None, None] for fp in tempfiles: assert fp.tell() != 0 - - -def test_download_chunks_concurrently_to_file( - shared_bucket, file_data, blobs_to_delete -): - blob = shared_bucket.blob("big") - blob.upload_from_filename(file_data["big"]["path"]) - blobs_to_delete.append(blob) - - blob.reload() - fp = tempfile.TemporaryFile() - result = transfer_manager.download_chunks_concurrently_to_file( - blob, fp, chunk_size=1024 * 1024 - ) - assert result is None - assert fp.tell() != 0 - - fp.seek(0) - assert blob.md5_hash.encode("utf8") == _base64_md5hash(fp) diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 9f90337a5..f52d5471b 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -20,7 +20,6 @@ from google.api_core import exceptions import os -import io import tempfile import unittest import mock @@ -79,7 +78,7 @@ def test_upload_many_passes_concurrency_options(self): "concurrent.futures.ThreadPoolExecutor" ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: transfer_manager.upload_many( - FILE_BLOB_PAIRS, max_workers=MAX_WORKERS, deadline=DEADLINE + FILE_BLOB_PAIRS, threads=MAX_WORKERS, deadline=DEADLINE ) pool_patch.assert_called_with(max_workers=MAX_WORKERS) wait_patch.assert_called_with( @@ -164,7 +163,7 @@ def test_download_many_passes_concurrency_options(self): "concurrent.futures.ThreadPoolExecutor" ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: transfer_manager.download_many( - BLOB_FILE_PAIRS, max_workers=MAX_WORKERS, deadline=DEADLINE + BLOB_FILE_PAIRS, threads=MAX_WORKERS, deadline=DEADLINE ) pool_patch.assert_called_with(max_workers=MAX_WORKERS) wait_patch.assert_called_with( @@ -189,54 +188,6 @@ def test_download_many_raises_exceptions(self): with self.assertRaises(ConnectionError): transfer_manager.download_many(BLOB_FILE_PAIRS, raise_exception=True) - def test_download_chunks_concurrently_to_file(self): - BLOB_CONTENTS = b"1234567812345678A" - blob = mock.Mock() - blob.size = len(BLOB_CONTENTS) - blob.generation = None - - FAKE_ENCODING = "fake-gzip" - DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} - - def fake_download_to_file(file_obj, start, end, **kwargs): - file_obj.write(BLOB_CONTENTS[start : end + 1]) - self.assertEqual(kwargs, DOWNLOAD_KWARGS) - - blob.download_to_file = fake_download_to_file - - file_obj = io.BytesIO() - - transfer_manager.download_chunks_concurrently_to_file( - blob, file_obj, chunk_size=4, download_kwargs=DOWNLOAD_KWARGS - ) - - # Generation wasn't set, so reload should have been called. - blob.reload.assert_called_with() - - file_obj.seek(0) - result = file_obj.read() - self.assertEqual(result, BLOB_CONTENTS) - - def test_download_chunks_passes_concurrency_arguments_and_kwargs(self): - blob = mock.Mock() - blob.size = 17 - blob.generation = 1 - - file_obj = mock.Mock() - - MAX_WORKERS = 7 - DEADLINE = 10 - with mock.patch( - "concurrent.futures.ThreadPoolExecutor" - ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: - transfer_manager.download_chunks_concurrently_to_file( - blob, file_obj, chunk_size=4, max_workers=MAX_WORKERS, deadline=DEADLINE - ) - pool_patch.assert_called_with(max_workers=MAX_WORKERS) - wait_patch.assert_called_with( - mock.ANY, timeout=DEADLINE, return_when=mock.ANY - ) - def test_upload_many_from_filenames(self): bucket = mock.Mock() @@ -264,7 +215,7 @@ def test_upload_many_from_filenames(self): skip_if_exists=True, blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, upload_kwargs=UPLOAD_KWARGS, - max_workers=MAX_WORKERS, + threads=MAX_WORKERS, deadline=DEADLINE, raise_exception=True, ) @@ -273,7 +224,7 @@ def test_upload_many_from_filenames(self): EXPECTED_FILE_BLOB_PAIRS, skip_if_exists=True, upload_kwargs=UPLOAD_KWARGS, - max_workers=MAX_WORKERS, + threads=MAX_WORKERS, deadline=DEADLINE, raise_exception=True, ) @@ -299,7 +250,7 @@ def test_upload_many_from_filenames_minimal_args(self): EXPECTED_FILE_BLOB_PAIRS, skip_if_exists=False, upload_kwargs=None, - max_workers=None, + threads=4, deadline=None, raise_exception=False, ) @@ -329,7 +280,7 @@ def test_download_many_to_path(self): destination_directory=PATH_ROOT, blob_name_prefix=BLOB_NAME_PREFIX, download_kwargs=DOWNLOAD_KWARGS, - max_workers=MAX_WORKERS, + threads=MAX_WORKERS, deadline=DEADLINE, create_directories=False, raise_exception=True, @@ -338,7 +289,7 @@ def test_download_many_to_path(self): mock_download_many.assert_called_once_with( EXPECTED_BLOB_FILE_PAIRS, download_kwargs=DOWNLOAD_KWARGS, - max_workers=MAX_WORKERS, + threads=MAX_WORKERS, deadline=DEADLINE, raise_exception=True, ) @@ -374,7 +325,7 @@ def test_download_many_to_path_creates_directories(self): mock_download_many.assert_called_once_with( EXPECTED_BLOB_FILE_PAIRS, download_kwargs=None, - max_workers=None, + threads=4, deadline=None, raise_exception=True, ) From d04d10598be0a59c3842ae9565956670ab9d5b30 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 6 Dec 2022 11:42:58 -0800 Subject: [PATCH 16/19] update snippets to add thread info --- samples/snippets/snippets_test.py | 11 +++--- samples/snippets/storage_transfer_manager.py | 35 ++++++++++++++++---- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index aba90ee1f..2ec02589d 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -671,8 +671,6 @@ def test_transfer_manager_snippets(test_bucket, capsys): "blobs/test.txt", "blobs/nesteddir/test.txt", ] - TEST_DATA_24_BYTES = b"I am a rather big blob! " - SIZE_MULTIPLIER = 1024 with tempfile.TemporaryDirectory() as uploads: # Create dirs and nested dirs @@ -686,7 +684,10 @@ def test_transfer_manager_snippets(test_bucket, capsys): f.write(name) storage_transfer_manager.upload_many_blobs_with_transfer_manager( - test_bucket.name, BLOB_NAMES, source_directory="{}/".format(uploads) + test_bucket.name, + BLOB_NAMES, + source_directory="{}/".format(uploads), + threads=2, ) out, _ = capsys.readouterr() @@ -696,7 +697,9 @@ def test_transfer_manager_snippets(test_bucket, capsys): with tempfile.TemporaryDirectory() as downloads: # Download the files. storage_transfer_manager.download_all_blobs_with_transfer_manager( - test_bucket.name, destination_directory=os.path.join(downloads, "") + test_bucket.name, + destination_directory=os.path.join(downloads, ""), + threads=2, ) out, _ = capsys.readouterr() diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py index e5fc55df0..961ab2cdd 100644 --- a/samples/snippets/storage_transfer_manager.py +++ b/samples/snippets/storage_transfer_manager.py @@ -14,7 +14,7 @@ def upload_many_blobs_with_transfer_manager( - bucket_name, filenames, source_directory="" + bucket_name, filenames, source_directory="", threads=4 ): """Upload every file in a list to a bucket, concurrently in a thread pool. @@ -40,13 +40,20 @@ def upload_many_blobs_with_transfer_manager( # end user input. # source_directory="" + # The number of threads to use for the operation. The performance impact of + # this value depends on the use case, but generally, smaller files benefit + # from more threads and larger files don't benefit from more threads. Too + # many threads can slow operations, especially with large files, due to + # contention over the Python GIL. + # threads=4 + from google.cloud.storage import Client, transfer_manager storage_client = Client() bucket = storage_client.bucket(bucket_name) results = transfer_manager.upload_many_from_filenames( - bucket, filenames, source_directory=source_directory + bucket, filenames, source_directory=source_directory, threads=threads ) for name, result in zip(filenames, results): @@ -59,7 +66,7 @@ def upload_many_blobs_with_transfer_manager( print("Uploaded {} to {}.".format(name, bucket.name)) -def upload_directory_with_transfer_manager(bucket_name, directory): +def upload_directory_with_transfer_manager(bucket_name, directory, threads=4): """Upload every file in a directory, including all files in subdirectories. Each blob name is derived from the filename, not including the `directory` @@ -76,6 +83,13 @@ def upload_directory_with_transfer_manager(bucket_name, directory): # working directory". # directory="" + # The number of threads to use for the operation. The performance impact of + # this value depends on the use case, but generally, smaller files benefit + # from more threads and larger files don't benefit from more threads. Too + # many threads can slow operations, especially with large files, due to + # contention over the Python GIL. + # threads=4 + from pathlib import Path from google.cloud.storage import Client, transfer_manager @@ -105,7 +119,7 @@ def upload_directory_with_transfer_manager(bucket_name, directory): # Start the upload. results = transfer_manager.upload_many_from_filenames( - bucket, string_paths, source_directory=directory + bucket, string_paths, source_directory=directory, threads=threads ) for name, result in zip(string_paths, results): @@ -118,7 +132,9 @@ def upload_directory_with_transfer_manager(bucket_name, directory): print("Uploaded {} to {}.".format(name, bucket.name)) -def download_all_blobs_with_transfer_manager(bucket_name, destination_directory=""): +def download_all_blobs_with_transfer_manager( + bucket_name, destination_directory="", threads=4 +): """Download all of the blobs in a bucket, concurrently in a thread pool. The filename of each blob once downloaded is derived from the blob name and @@ -140,6 +156,13 @@ def download_all_blobs_with_transfer_manager(bucket_name, destination_directory= # intended for unsanitized end user input. # destination_directory = "" + # The number of threads to use for the operation. The performance impact of + # this value depends on the use case, but generally, smaller files benefit + # from more threads and larger files don't benefit from more threads. Too + # many threads can slow operations, especially with large files, due to + # contention over the Python GIL. + # threads=4 + from google.cloud.storage import Client, transfer_manager storage_client = Client() @@ -148,7 +171,7 @@ def download_all_blobs_with_transfer_manager(bucket_name, destination_directory= blob_names = [blob.name for blob in bucket.list_blobs()] results = transfer_manager.download_many_to_path( - bucket, blob_names, destination_directory=destination_directory + bucket, blob_names, destination_directory=destination_directory, threads=threads ) for name, result in zip(blob_names, results): From 7a9eb97fcd5da9ad28408e73234e3864acf0cf0a Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 6 Dec 2022 14:30:43 -0800 Subject: [PATCH 17/19] fix snippets test issue due to change in dependency --- samples/snippets/snippets_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 2ec02589d..33b428fc0 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -640,7 +640,8 @@ def test_storage_configure_retries(test_blob, capsys): out, _ = capsys.readouterr() assert "The following library method is customized to be retried" in out assert "_should_retry" in out - assert "initial=1.5, maximum=45.0, multiplier=1.2, deadline=500.0" in out + assert "initial=1.5, maximum=45.0, multiplier=1.2" in out + assert "500" in out # "deadline" or "timeout" depending on dependency ver. def test_batch_request(test_bucket): From d24f9c22c49a03907916ca5c998c3ee5eb2673b6 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 6 Dec 2022 14:33:56 -0800 Subject: [PATCH 18/19] snippet nomenclature --- samples/snippets/snippets_test.py | 2 +- samples/snippets/storage_transfer_manager.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 33b428fc0..4ad0dc1a0 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -728,7 +728,7 @@ def test_transfer_manager_directory_upload(test_bucket, capsys): f.write(name) storage_transfer_manager.upload_directory_with_transfer_manager( - test_bucket.name, directory="{}/".format(uploads) + test_bucket.name, source_directory="{}/".format(uploads) ) out, _ = capsys.readouterr() diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py index 961ab2cdd..eac0bd1c6 100644 --- a/samples/snippets/storage_transfer_manager.py +++ b/samples/snippets/storage_transfer_manager.py @@ -66,7 +66,7 @@ def upload_many_blobs_with_transfer_manager( print("Uploaded {} to {}.".format(name, bucket.name)) -def upload_directory_with_transfer_manager(bucket_name, directory, threads=4): +def upload_directory_with_transfer_manager(bucket_name, source_directory, threads=4): """Upload every file in a directory, including all files in subdirectories. Each blob name is derived from the filename, not including the `directory` @@ -81,7 +81,7 @@ def upload_directory_with_transfer_manager(bucket_name, directory, threads=4): # The directory on your computer to upload. Files in the directory and its # subdirectories will be uploaded. An empty string means "the current # working directory". - # directory="" + # source_directory="" # The number of threads to use for the operation. The performance impact of # this value depends on the use case, but generally, smaller files benefit @@ -102,7 +102,7 @@ def upload_directory_with_transfer_manager(bucket_name, directory, threads=4): # multiple lines here for clarity. # First, recursively get all files in `directory` as Path objects. - directory_as_path_obj = Path(directory) + directory_as_path_obj = Path(source_directory) paths = directory_as_path_obj.rglob("*") # Filter so the list only includes files, not directories themselves. @@ -119,7 +119,7 @@ def upload_directory_with_transfer_manager(bucket_name, directory, threads=4): # Start the upload. results = transfer_manager.upload_many_from_filenames( - bucket, string_paths, source_directory=directory, threads=threads + bucket, string_paths, source_directory=source_directory, threads=threads ) for name, result in zip(string_paths, results): From d17bd0a97b6b7e6cbb9fe7524e9e8e23914e8451 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 6 Dec 2022 15:07:24 -0800 Subject: [PATCH 19/19] fix samples for real this time --- samples/snippets/storage_transfer_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py index eac0bd1c6..0a02b96e3 100644 --- a/samples/snippets/storage_transfer_manager.py +++ b/samples/snippets/storage_transfer_manager.py @@ -110,7 +110,7 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, thread # These paths are relative to the current working directory. Next, make them # relative to `directory` - relative_paths = [path.relative_to(directory) for path in file_paths] + relative_paths = [path.relative_to(source_directory) for path in file_paths] # Finally, convert them all to strings. string_paths = [str(path) for path in relative_paths]