From 24f644fca57567cbb66e82276a9ce5b3b0260a37 Mon Sep 17 00:00:00 2001 From: Jeff Date: Thu, 20 Aug 2020 18:48:51 -0700 Subject: [PATCH 01/14] Improve format_bytes() --- frameioclient/utils.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/frameioclient/utils.py b/frameioclient/utils.py index c7aad855..7a7a0a96 100644 --- a/frameioclient/utils.py +++ b/frameioclient/utils.py @@ -24,20 +24,26 @@ def stream(func, page=1, page_size=20): page += 1 -def format_bytes(size): +def format_bytes(size, type="speed"): """ Convert bytes to KB/MB/GB/TB/s """ # 2**10 = 1024 power = 2**10 n = 0 - power_labels = {0 : 'B/s', 1: 'KB/s', 2: 'MB/s', 3: 'GB/s', 4: 'TB/s'} + power_labels = {0 : 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'} while size > power: size /= power n += 1 - return " ".join((str(round(size, 2)), power_labels[n])) + formatted = " ".join((str(round(size, 2)), power_labels[n])) + + if type == "speed": + return formatted + "/s" + + elif type == "size": + return formatted def calculate_hash(file_path): """ From 8fa0d5478b799601e3fc05359952e6f396f61945 Mon Sep 17 00:00:00 2001 From: Jeff Date: Thu, 20 Aug 2020 20:15:27 -0700 Subject: [PATCH 02/14] PoC multi-part accelerated downloads --- frameioclient/client.py | 6 +- frameioclient/download.py | 115 ++++++++++++++++++++++++++++++++++---- 2 files changed, 108 insertions(+), 13 deletions(-) diff --git a/frameioclient/client.py b/frameioclient/client.py index 29e411dc..5638790a 100644 --- a/frameioclient/client.py +++ b/frameioclient/client.py @@ -401,7 +401,7 @@ def upload(self, asset, file): uploader = FrameioUploader(asset, file) uploader.upload() - def download(self, asset, download_folder): + def download(self, asset, download_folder, prefix, acceleration_override=False): """ Download an asset. The method will exit once the file is downloaded. @@ -413,8 +413,8 @@ def download(self, asset, download_folder): client.download(asset, "~./Downloads") """ - downloader = FrameioDownloader(asset, download_folder) - downloader.download() + downloader = FrameioDownloader(asset, download_folder, prefix=prefix) + return downloader.download_handler(acceleration_override=acceleration_override) def get_comment(self, comment_id, **kwargs): """ diff --git a/frameioclient/download.py b/frameioclient/download.py index 8fdcc41e..6bf59516 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -1,13 +1,32 @@ +import io import os import math +import time import requests +import threading +import concurrent.futures +from .utils import format_bytes from .exceptions import DownloadException +thread_local = threading.local() + class FrameioDownloader(object): - def __init__(self, asset, download_folder): + def __init__(self, asset, download_folder, prefix=None): self.asset = asset self.download_folder = download_folder + self.resolution_map = dict() + self.destination = None + self.watermarked = False + self.chunk_manager = dict() + self.chunks = (asset['filesize'] / 52428800) + self.prefix = prefix + self.filename = asset['name'] + + def _get_session(self): + if not hasattr(thread_local, "session"): + thread_local.session = requests.Session() + return thread_local.session def get_download_key(self): try: @@ -35,17 +54,93 @@ def get_download_key(self): return url - def download(self): - original_filename = self.asset['name'] - final_destination = os.path.join(self.download_folder, original_filename) + def get_path(self): + if self.prefix != None: + self.filename = self.prefix + self.filename - print("Final destiation: {}".format(final_destination)) + if self.destination == None: + final_destination = os.path.join(self.download_folder, self.filename) + self.destination = final_destination + + return self.destination - if os.path.isfile(final_destination): - return final_destination + def download_handler(self, acceleration_override=False): + if os.path.isfile(self.get_path()): + return self.destination, 0 else: url = self.get_download_key() - r = requests.get(url) - open(final_destination, 'wb').write(r.content) - return final_destination + + if self.watermarked == True: + return self.download(url) + else: + if acceleration_override == True: + return self.accelerate_download(url) + else: + return self.download(url) + + def download(self, url): + start_time = time.time() + print("Beginning download -- {} -- {}".format(self.asset['name'], format_bytes(self.asset['filesize'], type="size"))) + + # Downloading + r = requests.get(url) + open(self.destination, 'wb').write(r.content) + + download_time = time.time() - start_time + download_speed = format_bytes(math.ceil(self.asset['filesize']/(download_time))) + print("Downloaded {} at {}".format(self.asset['filesize'], download_speed)) + + return self.destination, download_speed + + def accelerate_download(self, url): + start_time = time.time() + offset = (self.asset['filesize'] / self.chunks) + in_byte = 0 # Set initially here, but then override + + print("Accelerated download -- {} -- {}".format(self.asset['name'], format_bytes(self.asset['filesize'], type="size"))) + + # Build chunk manager state + chunk_list = list(range(self.chunks)) + for chunk in chunk_list: + self.chunk_manager.update({ + chunk: False + }) + + with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: + for i in range(self.chunks): + out_byte = offset * (i+1) + + headers = { + "Range": "bytes={}-{}".format(in_byte, out_byte) + } + task = (url, headers, i) + executor.submit(self.get_chunk, task) + + in_byte = out_byte + 1 # Reset new in byte + + # Merge chunks + print("Writing chunks to disk") + with open(self.destination, 'a') as outfile: + for chunk in self.chunk_manager: + outfile.write(self.chunk_manager[chunk]) + + download_time = time.time() - start_time + download_speed = format_bytes(math.ceil(self.asset['filesize']/(download_time))) + print("Downloaded {} at {}".format(self.asset['filesize'], download_speed)) + + return self.destination, download_speed + + def get_chunk(self, task): + url = task[0] + headers = task[1] + chunk_number = task[2] + + session = self._get_session() + + print("Getting chunk {}/{}".format(chunk_number + 1, self.chunks)) + r = session.get(url, headers=headers) + self.chunk_manager[chunk_number] = r.content + print("Completed chunk {}/{}".format(chunk_number + 1, self.chunks)) + + return True \ No newline at end of file From e8316cdf6126855aaca01ba0aba1940bbe5fbdcd Mon Sep 17 00:00:00 2001 From: Jeff Date: Fri, 21 Aug 2020 12:43:05 -0700 Subject: [PATCH 03/14] Tweak accelerated downloader and example --- examples/download_asset.py | 16 +++++++++++++--- frameioclient/download.py | 3 +-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/examples/download_asset.py b/examples/download_asset.py index 546486cc..e95f381d 100644 --- a/examples/download_asset.py +++ b/examples/download_asset.py @@ -2,11 +2,21 @@ from frameioclient import FrameioClient -def download_file(asset_id): +def benchmark(asset_id): token = os.getenv("FRAMEIO_TOKEN") client = FrameioClient(token) asset_info = client.get_asset(asset_id) - client.download(asset_info, "downloads") + normal_filename, normal_speed = client.download(asset_info, "downloads", prefix="normal_", acceleration_override=False) + accelerated_filename, accelerated_speed = client.download(asset_info, "downloads", prefix="accelerated_", acceleration_override=True) + + # normal = client.download(asset_info, "downloads", prefix="normal_", acceleration_override=False) + # accelerated = client.download(asset_info, "downloads", prefix="accelerated_", acceleration_override=True) + + # print(normal, accelerated) + + print("Normal speed: {}, Accelerated speed: {}".format(normal_speed, accelerated_speed)) if __name__ == "__main__": - download_file("6cc9a45f-64a7-456e-a95f-98687220bf6e") \ No newline at end of file + # download_file("60ff4cca-f97b-4311-be24-0eecd6970c01") + benchmark("811baf7a-3248-4c7c-9d94-cc1c6c496a76") + # benchmark("9cee7966-7db1-4066-b326-f9e6f5e929e4") diff --git a/frameioclient/download.py b/frameioclient/download.py index 6bf59516..815140da 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -103,7 +103,7 @@ def accelerate_download(self, url): chunk_list = list(range(self.chunks)) for chunk in chunk_list: self.chunk_manager.update({ - chunk: False + chunk: None }) with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: @@ -143,4 +143,3 @@ def get_chunk(self, task): print("Completed chunk {}/{}".format(chunk_number + 1, self.chunks)) return True - \ No newline at end of file From 13069d40470d2d3c8c50a9306266b65ef914067e Mon Sep 17 00:00:00 2001 From: Jeff Date: Tue, 25 Aug 2020 11:35:19 -0700 Subject: [PATCH 04/14] Improve chunk size re-use --- frameioclient/download.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/frameioclient/download.py b/frameioclient/download.py index 815140da..9808ebc2 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -19,7 +19,8 @@ def __init__(self, asset, download_folder, prefix=None): self.destination = None self.watermarked = False self.chunk_manager = dict() - self.chunks = (asset['filesize'] / 52428800) + self.chunk_size = 52428800 + self.chunks = math.floor(asset['filesize'] / self.chunk_size) self.prefix = prefix self.filename = asset['name'] @@ -94,7 +95,7 @@ def download(self, url): def accelerate_download(self, url): start_time = time.time() - offset = (self.asset['filesize'] / self.chunks) + offset = math.ceil(self.asset['filesize'] / self.chunks) in_byte = 0 # Set initially here, but then override print("Accelerated download -- {} -- {}".format(self.asset['name'], format_bytes(self.asset['filesize'], type="size"))) From e8dfc422f43cb54423bf000ca80f6e5773e26c41 Mon Sep 17 00:00:00 2001 From: Jeff Date: Tue, 29 Sep 2020 22:28:34 -0700 Subject: [PATCH 05/14] Finish multi-threaded download --- examples/download_asset.py | 14 ++-- frameioclient/client.py | 6 +- frameioclient/download.py | 126 +++++++++++++++++++++++------------- frameioclient/exceptions.py | 14 +++- 4 files changed, 99 insertions(+), 61 deletions(-) diff --git a/examples/download_asset.py b/examples/download_asset.py index e95f381d..1939614f 100644 --- a/examples/download_asset.py +++ b/examples/download_asset.py @@ -6,17 +6,11 @@ def benchmark(asset_id): token = os.getenv("FRAMEIO_TOKEN") client = FrameioClient(token) asset_info = client.get_asset(asset_id) - normal_filename, normal_speed = client.download(asset_info, "downloads", prefix="normal_", acceleration_override=False) - accelerated_filename, accelerated_speed = client.download(asset_info, "downloads", prefix="accelerated_", acceleration_override=True) + accelerated_filename = client.download(asset_info, "downloads", prefix="accelerated_", acceleration=True, concurrency=20) - # normal = client.download(asset_info, "downloads", prefix="normal_", acceleration_override=False) - # accelerated = client.download(asset_info, "downloads", prefix="accelerated_", acceleration_override=True) - - # print(normal, accelerated) - - print("Normal speed: {}, Accelerated speed: {}".format(normal_speed, accelerated_speed)) + # print("Normal speed: {}, Accelerated speed: {}".format(normal_speed, accelerated_speed)) if __name__ == "__main__": # download_file("60ff4cca-f97b-4311-be24-0eecd6970c01") - benchmark("811baf7a-3248-4c7c-9d94-cc1c6c496a76") - # benchmark("9cee7966-7db1-4066-b326-f9e6f5e929e4") + benchmark("20a1df34-e8ad-48fd-b455-c68294cc7f71") + # benchmark("9cee7966-7db1-4066-b326-f9e6f5e929e4") \ No newline at end of file diff --git a/frameioclient/client.py b/frameioclient/client.py index 5638790a..563de01d 100644 --- a/frameioclient/client.py +++ b/frameioclient/client.py @@ -401,7 +401,7 @@ def upload(self, asset, file): uploader = FrameioUploader(asset, file) uploader.upload() - def download(self, asset, download_folder, prefix, acceleration_override=False): + def download(self, asset, download_folder, prefix=None, acceleration=False, concurrency=5): """ Download an asset. The method will exit once the file is downloaded. @@ -413,8 +413,8 @@ def download(self, asset, download_folder, prefix, acceleration_override=False): client.download(asset, "~./Downloads") """ - downloader = FrameioDownloader(asset, download_folder, prefix=prefix) - return downloader.download_handler(acceleration_override=acceleration_override) + downloader = FrameioDownloader(asset, download_folder, prefix, acceleration, concurrency) + return downloader.download_handler() def get_comment(self, comment_id, **kwargs): """ diff --git a/frameioclient/download.py b/frameioclient/download.py index 9808ebc2..ede823a9 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -7,28 +7,45 @@ import concurrent.futures from .utils import format_bytes -from .exceptions import DownloadException +from .exceptions import DownloadException, WatermarkIDDownloadException thread_local = threading.local() class FrameioDownloader(object): - def __init__(self, asset, download_folder, prefix=None): + def __init__(self, asset, download_folder, prefix, acceleration=False, concurrency=5): + self.acceleration = acceleration self.asset = asset self.download_folder = download_folder self.resolution_map = dict() self.destination = None self.watermarked = False - self.chunk_manager = dict() - self.chunk_size = 52428800 - self.chunks = math.floor(asset['filesize'] / self.chunk_size) + self.file_size = asset['filesize'] + self.concurrency = concurrency + self.futures = list() + self.chunk_size = 52428800 / 2 # 25 MB chunk size or so + self.chunks = self.calculate_chunk_count(self.file_size, self.chunk_size) self.prefix = prefix self.filename = asset['name'] + @staticmethod + def calculate_chunk_count(file_size, chunk_size): + return math.floor(file_size/chunk_size) + def _get_session(self): if not hasattr(thread_local, "session"): thread_local.session = requests.Session() return thread_local.session + def _create_file_stub(self): + try: + fp = open(self.destination, "wb") + fp.write(b'\0' * self.file_size) + fp.close() + except Exception as e: + print(e) + return False + return True + def get_download_key(self): try: url = self.asset['original'] @@ -51,7 +68,7 @@ def get_download_key(self): except KeyError: raise DownloadException else: - raise DownloadException + raise WatermarkIDDownloadException return url @@ -65,82 +82,99 @@ def get_path(self): return self.destination - def download_handler(self, acceleration_override=False): + def download_handler(self): if os.path.isfile(self.get_path()): - return self.destination, 0 + print("File already exists at this location.") + return self.destination else: url = self.get_download_key() if self.watermarked == True: return self.download(url) else: - if acceleration_override == True: - return self.accelerate_download(url) + if self.acceleration == True: + return self.accelerated_download(url) else: return self.download(url) def download(self, url): start_time = time.time() - print("Beginning download -- {} -- {}".format(self.asset['name'], format_bytes(self.asset['filesize'], type="size"))) + print("Beginning download -- {} -- {}".format(self.asset['name'], format_bytes(self.file_size, type="size"))) # Downloading r = requests.get(url) open(self.destination, 'wb').write(r.content) download_time = time.time() - start_time - download_speed = format_bytes(math.ceil(self.asset['filesize']/(download_time))) - print("Downloaded {} at {}".format(self.asset['filesize'], download_speed)) + download_speed = format_bytes(math.ceil(self.file_size/(download_time))) + print("Downloaded {} at {}".format(self.file_size, download_speed)) return self.destination, download_speed - def accelerate_download(self, url): + def accelerated_download(self, url): start_time = time.time() + + # Generate stub + try: + self._create_file_stub() + + except Exception as e: + raise DownloadException + print("Aborting", e) + offset = math.ceil(self.asset['filesize'] / self.chunks) in_byte = 0 # Set initially here, but then override - print("Accelerated download -- {} -- {}".format(self.asset['name'], format_bytes(self.asset['filesize'], type="size"))) - - # Build chunk manager state - chunk_list = list(range(self.chunks)) - for chunk in chunk_list: - self.chunk_manager.update({ - chunk: None - }) + print("Accelerated download -- {} -- {}".format(self.asset['name'], format_bytes(self.file_size, type="size"))) - with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: + # Queue up threads + with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: for i in range(self.chunks): - out_byte = offset * (i+1) - - headers = { - "Range": "bytes={}-{}".format(in_byte, out_byte) - } - task = (url, headers, i) - executor.submit(self.get_chunk, task) + out_byte = offset * (i+1) # Advance by one byte to get proper offset + task = (url, in_byte, out_byte, i) + self.futures.append(executor.submit(self.download_chunk, task)) in_byte = out_byte + 1 # Reset new in byte + + # Wait on threads to finish + for future in concurrent.futures.as_completed(self.futures): + try: + status = future.result() + print(status) + except Exception as exc: + print(exc) + + # Calculate and print stats + download_time = time.time() - start_time + download_speed = format_bytes(math.ceil(self.file_size/(download_time))) + print("Downloaded {} at {}".format(self.file_size, download_speed)) - # Merge chunks - print("Writing chunks to disk") - with open(self.destination, 'a') as outfile: - for chunk in self.chunk_manager: - outfile.write(self.chunk_manager[chunk]) + return self.destination - download_time = time.time() - start_time - download_speed = format_bytes(math.ceil(self.asset['filesize']/(download_time))) - print("Downloaded {} at {}".format(self.asset['filesize'], download_speed)) - return self.destination, download_speed + def download_chunk(self, task): + # Download a particular chunk + # Called by the threadpool execuor - def get_chunk(self, task): url = task[0] - headers = task[1] - chunk_number = task[2] + start_byte = task[1] + end_byte = task[2] + chunk_number = task[3] session = self._get_session() - print("Getting chunk {}/{}".format(chunk_number + 1, self.chunks)) - r = session.get(url, headers=headers) - self.chunk_manager[chunk_number] = r.content + + # Specify the starting and ending of the file + headers = {'Range': 'bytes=%d-%d' % (start_byte, end_byte)} + + # Grab the data as a stream + r = session.get(url, headers=headers, stream=True) + + with open(self.destination, "r+b") as fp: + fp.seek(start_byte) # Seek to the right of the file + fp.write(r.content) # Write the data + print("Done writing chunk {}/{}".format(chunk_number + 1, self.chunks)) + print("Completed chunk {}/{}".format(chunk_number + 1, self.chunks)) - return True + return "Complete!" diff --git a/frameioclient/exceptions.py b/frameioclient/exceptions.py index 7e1b831d..e68f50b8 100644 --- a/frameioclient/exceptions.py +++ b/frameioclient/exceptions.py @@ -11,7 +11,7 @@ def __init__( self.message = message super().__init__(self.message) -class DownloadException(Exception): +class WatermarkIDDownloadException(Exception): """Exception raised when trying to download a file where there is no available download URL. """ @@ -20,4 +20,14 @@ def __init__( message="This file is unavailable for download due to security and permission settings." ): self.message = message - super().__init__(self.message) \ No newline at end of file + super().__init__(self.message) + +class DownloadException(Exception): + """Exception raised when trying to download a file + """ + def __init__( + self, + message="Unable to download for some reason." + ): + self.message = message + super().__init__(self.message) From 054dad352de2a29e9c2d602e357bdf604a504b8f Mon Sep 17 00:00:00 2001 From: Jeff Date: Tue, 29 Sep 2020 22:32:03 -0700 Subject: [PATCH 06/14] Turn on acceleration for the test --- tests/integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration.py b/tests/integration.py index cb6c9b6a..42928e31 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -107,7 +107,7 @@ def test_download(client, override=False): start_time = time.time() print("{}/{} Beginning to download: {}".format(count, len(asset_list), asset['name'])) - client.download(asset, 'downloads') + client.download(asset, 'downloads', acceleration=True, concurrency=20) download_time = time.time() - start_time download_speed = format_bytes(ceil(asset['filesize']/(download_time))) From d9dbe4b562ed1009f9796c374133db4ae057b395 Mon Sep 17 00:00:00 2001 From: Jeff Date: Tue, 29 Sep 2020 22:35:38 -0700 Subject: [PATCH 07/14] Wrap arithmetic for py2 --- frameioclient/download.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frameioclient/download.py b/frameioclient/download.py index ede823a9..fd3d83d5 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -22,7 +22,7 @@ def __init__(self, asset, download_folder, prefix, acceleration=False, concurren self.file_size = asset['filesize'] self.concurrency = concurrency self.futures = list() - self.chunk_size = 52428800 / 2 # 25 MB chunk size or so + self.chunk_size = (52428800 / 2) # 25 MB chunk size or so self.chunks = self.calculate_chunk_count(self.file_size, self.chunk_size) self.prefix = prefix self.filename = asset['name'] From 5e30c05a732eafc0d84b7a60f55f852568778fc5 Mon Sep 17 00:00:00 2001 From: Jeff Date: Tue, 29 Sep 2020 22:38:40 -0700 Subject: [PATCH 08/14] Py 2.7 fixes --- frameioclient/download.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frameioclient/download.py b/frameioclient/download.py index fd3d83d5..f7a63825 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -122,14 +122,14 @@ def accelerated_download(self, url): raise DownloadException print("Aborting", e) - offset = math.ceil(self.asset['filesize'] / self.chunks) + offset = math.ceil(self.file_size / self.chunks) in_byte = 0 # Set initially here, but then override print("Accelerated download -- {} -- {}".format(self.asset['name'], format_bytes(self.file_size, type="size"))) # Queue up threads with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: - for i in range(self.chunks): + for i in range(int(self.chunks)): out_byte = offset * (i+1) # Advance by one byte to get proper offset task = (url, in_byte, out_byte, i) From 2fc9cae601f1d6f5838557fc0c1e90e7a6509831 Mon Sep 17 00:00:00 2001 From: Jeff Date: Tue, 29 Sep 2020 22:42:20 -0700 Subject: [PATCH 09/14] Improve cast to int() for 2.7 in right spot --- frameioclient/download.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frameioclient/download.py b/frameioclient/download.py index f7a63825..f71695df 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -29,7 +29,7 @@ def __init__(self, asset, download_folder, prefix, acceleration=False, concurren @staticmethod def calculate_chunk_count(file_size, chunk_size): - return math.floor(file_size/chunk_size) + return int(math.floor(file_size/chunk_size)) # Extra cast to handle Py2 def _get_session(self): if not hasattr(thread_local, "session"): @@ -129,7 +129,7 @@ def accelerated_download(self, url): # Queue up threads with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: - for i in range(int(self.chunks)): + for i in range(self.chunks): out_byte = offset * (i+1) # Advance by one byte to get proper offset task = (url, in_byte, out_byte, i) From fecb31fea68a050f79b8f3e4a96aed5cc3e1ab96 Mon Sep 17 00:00:00 2001 From: Jeff Date: Tue, 29 Sep 2020 22:46:37 -0700 Subject: [PATCH 10/14] Revert fancy math --- frameioclient/download.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/frameioclient/download.py b/frameioclient/download.py index f71695df..19d84ee3 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -23,14 +23,10 @@ def __init__(self, asset, download_folder, prefix, acceleration=False, concurren self.concurrency = concurrency self.futures = list() self.chunk_size = (52428800 / 2) # 25 MB chunk size or so - self.chunks = self.calculate_chunk_count(self.file_size, self.chunk_size) + self.chunks = math.floor(self.file_size/self.chunk_size) self.prefix = prefix self.filename = asset['name'] - @staticmethod - def calculate_chunk_count(file_size, chunk_size): - return int(math.floor(file_size/chunk_size)) # Extra cast to handle Py2 - def _get_session(self): if not hasattr(thread_local, "session"): thread_local.session = requests.Session() @@ -129,7 +125,7 @@ def accelerated_download(self, url): # Queue up threads with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: - for i in range(self.chunks): + for i in range(int(self.chunks)): out_byte = offset * (i+1) # Advance by one byte to get proper offset task = (url, in_byte, out_byte, i) From 3afce4e5228d15090d03cc291ef80d2d5a630280 Mon Sep 17 00:00:00 2001 From: Jeff Date: Mon, 12 Oct 2020 19:46:18 -0700 Subject: [PATCH 11/14] Updates per comments on PR --- examples/download_asset.py | 2 +- frameioclient/client.py | 4 +-- frameioclient/download.py | 62 ++++++++++++++++++++----------------- frameioclient/exceptions.py | 12 ++++++- frameioclient/utils.py | 23 +++++++++++++- tests/integration.py | 2 +- 6 files changed, 71 insertions(+), 34 deletions(-) diff --git a/examples/download_asset.py b/examples/download_asset.py index 1939614f..2dfd6604 100644 --- a/examples/download_asset.py +++ b/examples/download_asset.py @@ -6,7 +6,7 @@ def benchmark(asset_id): token = os.getenv("FRAMEIO_TOKEN") client = FrameioClient(token) asset_info = client.get_asset(asset_id) - accelerated_filename = client.download(asset_info, "downloads", prefix="accelerated_", acceleration=True, concurrency=20) + accelerated_filename = client.download(asset_info, "downloads", prefix="accelerated_", multi_part=True, concurrency=20) # print("Normal speed: {}, Accelerated speed: {}".format(normal_speed, accelerated_speed)) diff --git a/frameioclient/client.py b/frameioclient/client.py index 563de01d..0c6c39f2 100644 --- a/frameioclient/client.py +++ b/frameioclient/client.py @@ -401,7 +401,7 @@ def upload(self, asset, file): uploader = FrameioUploader(asset, file) uploader.upload() - def download(self, asset, download_folder, prefix=None, acceleration=False, concurrency=5): + def download(self, asset, download_folder, prefix=None, multi_part=False, concurrency=5): """ Download an asset. The method will exit once the file is downloaded. @@ -413,7 +413,7 @@ def download(self, asset, download_folder, prefix=None, acceleration=False, conc client.download(asset, "~./Downloads") """ - downloader = FrameioDownloader(asset, download_folder, prefix, acceleration, concurrency) + downloader = FrameioDownloader(asset, download_folder, prefix, multi_part, concurrency) return downloader.download_handler() def get_comment(self, comment_id, **kwargs): diff --git a/frameioclient/download.py b/frameioclient/download.py index 19d84ee3..72089133 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -6,26 +6,36 @@ import threading import concurrent.futures -from .utils import format_bytes -from .exceptions import DownloadException, WatermarkIDDownloadException +from .utils import format_bytes, normalize_filename +from .exceptions import DownloadException, WatermarkIDDownloadException, AssetNotFullyUploaded thread_local = threading.local() class FrameioDownloader(object): - def __init__(self, asset, download_folder, prefix, acceleration=False, concurrency=5): - self.acceleration = acceleration + def __init__(self, asset, download_folder, prefix, multi_part=False, concurrency=5): + self.multi_part = multi_part self.asset = asset + self.asset_type = None self.download_folder = download_folder self.resolution_map = dict() self.destination = None self.watermarked = False - self.file_size = asset['filesize'] + self.file_size = asset["filesize"] self.concurrency = concurrency self.futures = list() - self.chunk_size = (52428800 / 2) # 25 MB chunk size or so - self.chunks = math.floor(self.file_size/self.chunk_size) + self.chunk_size = (25 * 1024 * 1024) # 25 MB chunk size + self.chunks = math.ceil(self.file_size/self.chunk_size) self.prefix = prefix - self.filename = asset['name'] + self.filename = normalize_filename(asset["name"]) + + self._evaluate_asset() + + def _evaluate_asset(self): + if self.asset.get("_type") != "file": + raise DownloadException(message="Unsupport Asset type: {}".format(self.asset.get("_type"))) + + if self.asset.get("upload_completed_at") == None: + raise AssetNotFullyUploaded def _get_session(self): if not hasattr(thread_local, "session"): @@ -35,11 +45,11 @@ def _get_session(self): def _create_file_stub(self): try: fp = open(self.destination, "wb") - fp.write(b'\0' * self.file_size) + fp.write(b"\0" * self.file_size) fp.close() - except Exception as e: + except FileExistsError as e: print(e) - return False + raise e return True def get_download_key(self): @@ -88,26 +98,26 @@ def download_handler(self): if self.watermarked == True: return self.download(url) else: - if self.acceleration == True: - return self.accelerated_download(url) + if self.multi_part == True: + return self.multi_part_download(url) else: return self.download(url) def download(self, url): start_time = time.time() - print("Beginning download -- {} -- {}".format(self.asset['name'], format_bytes(self.file_size, type="size"))) + print("Beginning download -- {} -- {}".format(self.asset["name"], format_bytes(self.file_size, type="size"))) # Downloading r = requests.get(url) - open(self.destination, 'wb').write(r.content) + open(self.destination, "wb").write(r.content) download_time = time.time() - start_time download_speed = format_bytes(math.ceil(self.file_size/(download_time))) - print("Downloaded {} at {}".format(self.file_size, download_speed)) + print("Downloaded {} at {}".format(format_bytes(self.file_size), download_speed)) return self.destination, download_speed - def accelerated_download(self, url): + def multi_part_download(self, url): start_time = time.time() # Generate stub @@ -115,22 +125,21 @@ def accelerated_download(self, url): self._create_file_stub() except Exception as e: - raise DownloadException - print("Aborting", e) + raise DownloadException(message=e) offset = math.ceil(self.file_size / self.chunks) in_byte = 0 # Set initially here, but then override - print("Accelerated download -- {} -- {}".format(self.asset['name'], format_bytes(self.file_size, type="size"))) + print("Multi-part download -- {} -- {}".format(self.asset["name"], format_bytes(self.file_size, type="size"))) # Queue up threads with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: for i in range(int(self.chunks)): - out_byte = offset * (i+1) # Advance by one byte to get proper offset + out_byte = offset * (i+1) # Increment by the iterable + 1 so we don't mutiply by zero task = (url, in_byte, out_byte, i) self.futures.append(executor.submit(self.download_chunk, task)) - in_byte = out_byte + 1 # Reset new in byte + in_byte = out_byte # Reset new in byte equal to last out byte # Wait on threads to finish for future in concurrent.futures.as_completed(self.futures): @@ -143,14 +152,13 @@ def accelerated_download(self, url): # Calculate and print stats download_time = time.time() - start_time download_speed = format_bytes(math.ceil(self.file_size/(download_time))) - print("Downloaded {} at {}".format(self.file_size, download_speed)) + print("Downloaded {} at {}".format(format_bytes(self.file_size), download_speed)) return self.destination - def download_chunk(self, task): # Download a particular chunk - # Called by the threadpool execuor + # Called by the threadpool executor url = task[0] start_byte = task[1] @@ -161,7 +169,7 @@ def download_chunk(self, task): print("Getting chunk {}/{}".format(chunk_number + 1, self.chunks)) # Specify the starting and ending of the file - headers = {'Range': 'bytes=%d-%d' % (start_byte, end_byte)} + headers = {"Range": "bytes=%d-%d" % (start_byte, end_byte)} # Grab the data as a stream r = session.get(url, headers=headers, stream=True) @@ -171,6 +179,4 @@ def download_chunk(self, task): fp.write(r.content) # Write the data print("Done writing chunk {}/{}".format(chunk_number + 1, self.chunks)) - print("Completed chunk {}/{}".format(chunk_number + 1, self.chunks)) - return "Complete!" diff --git a/frameioclient/exceptions.py b/frameioclient/exceptions.py index e68f50b8..2005cec2 100644 --- a/frameioclient/exceptions.py +++ b/frameioclient/exceptions.py @@ -27,7 +27,17 @@ class DownloadException(Exception): """ def __init__( self, - message="Unable to download for some reason." + message="Generic Dowload exception." + ): + self.message = message + super().__init__(self.message) + +class AssetNotFullyUploaded(Exception): + """Exception raised when trying to download a file that isn't yet fully upload. + """ + def __init__( + self, + message="Unable to download this asset because it not yet fully uploaded." ): self.message = message super().__init__(self.message) diff --git a/frameioclient/utils.py b/frameioclient/utils.py index 7a7a0a96..bc0a45ae 100644 --- a/frameioclient/utils.py +++ b/frameioclient/utils.py @@ -1,5 +1,6 @@ import xxhash import sys +import re KB = 1024 MB = KB * KB @@ -79,4 +80,24 @@ def compare_items(dict1, dict2): if comparison == False: print("File mismatch between upload and download") - return comparison \ No newline at end of file + return comparison + +def get_valid_filename(s): + """ + Strip out invalid characters from a filename using regex + """ + s = str(s).strip().replace(' ', '_') + return re.sub(r'(?u)[^-\w.]', '', s) + +def normalize_filename(fn): + """ + Normalize filename using pure python + """ + validchars = "-_.() " + out = "" + for c in fn: + if str.isalpha(c) or str.isdigit(c) or (c in validchars): + out += c + else: + out += "_" + return out \ No newline at end of file diff --git a/tests/integration.py b/tests/integration.py index 42928e31..5e917005 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -107,7 +107,7 @@ def test_download(client, override=False): start_time = time.time() print("{}/{} Beginning to download: {}".format(count, len(asset_list), asset['name'])) - client.download(asset, 'downloads', acceleration=True, concurrency=20) + client.download(asset, 'downloads', multi_part=True, concurrency=20) download_time = time.time() - start_time download_speed = format_bytes(ceil(asset['filesize']/(download_time))) From e2508a7dddfcfb6342e33591f2133c243e3e6139 Mon Sep 17 00:00:00 2001 From: Jeff Date: Mon, 12 Oct 2020 22:38:26 -0700 Subject: [PATCH 12/14] Fix unicode in py 2 Extra prints More verbosity Change the way we detect unicode Fix wrong variable Force to string Remove extra prints --- frameioclient/utils.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/frameioclient/utils.py b/frameioclient/utils.py index bc0a45ae..17710a5d 100644 --- a/frameioclient/utils.py +++ b/frameioclient/utils.py @@ -95,9 +95,17 @@ def normalize_filename(fn): """ validchars = "-_.() " out = "" + + if isinstance(fn, str): + pass + elif isinstance(fn, unicode): + fn = str(fn.decode('utf-8', 'ignore')) + else: + pass + for c in fn: if str.isalpha(c) or str.isdigit(c) or (c in validchars): out += c else: out += "_" - return out \ No newline at end of file + return out From d6df6a4bd918357bcbb8e5b1040cbf02de4d97cb Mon Sep 17 00:00:00 2001 From: Jeff Date: Mon, 12 Oct 2020 22:41:29 -0700 Subject: [PATCH 13/14] Fix filsize formatting in download.py --- frameioclient/download.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frameioclient/download.py b/frameioclient/download.py index 72089133..2b93446a 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -113,7 +113,7 @@ def download(self, url): download_time = time.time() - start_time download_speed = format_bytes(math.ceil(self.file_size/(download_time))) - print("Downloaded {} at {}".format(format_bytes(self.file_size), download_speed)) + print("Downloaded {} at {}".format(format_bytes(self.file_size, type="size"), download_speed)) return self.destination, download_speed @@ -152,7 +152,7 @@ def multi_part_download(self, url): # Calculate and print stats download_time = time.time() - start_time download_speed = format_bytes(math.ceil(self.file_size/(download_time))) - print("Downloaded {} at {}".format(format_bytes(self.file_size), download_speed)) + print("Downloaded {} at {}".format(format_bytes(self.file_size, type="size"), download_speed)) return self.destination From 26115921d8a936ca7a513f88cc8240886b3fd624 Mon Sep 17 00:00:00 2001 From: Jeff Date: Wed, 14 Oct 2020 13:05:44 -0700 Subject: [PATCH 14/14] Tweak self.watermarked --- frameioclient/download.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frameioclient/download.py b/frameioclient/download.py index 2b93446a..3f331296 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -19,7 +19,7 @@ def __init__(self, asset, download_folder, prefix, multi_part=False, concurrency self.download_folder = download_folder self.resolution_map = dict() self.destination = None - self.watermarked = False + self.watermarked = asset['is_session_watermarked'] # Default is probably false self.file_size = asset["filesize"] self.concurrency = concurrency self.futures = list() @@ -56,7 +56,7 @@ def get_download_key(self): try: url = self.asset['original'] except KeyError as e: - if self.asset['is_session_watermarked'] == True: + if self.watermarked == True: resolution_list = list() try: for resolution_key, download_url in sorted(self.asset['downloads'].items()):