diff --git a/examples/download_asset.py b/examples/download_asset.py index 546486cc..2dfd6604 100644 --- a/examples/download_asset.py +++ b/examples/download_asset.py @@ -2,11 +2,15 @@ from frameioclient import FrameioClient -def download_file(asset_id): +def benchmark(asset_id): token = os.getenv("FRAMEIO_TOKEN") client = FrameioClient(token) asset_info = client.get_asset(asset_id) - client.download(asset_info, "downloads") + accelerated_filename = client.download(asset_info, "downloads", prefix="accelerated_", multi_part=True, concurrency=20) + + # print("Normal speed: {}, Accelerated speed: {}".format(normal_speed, accelerated_speed)) if __name__ == "__main__": - download_file("6cc9a45f-64a7-456e-a95f-98687220bf6e") \ No newline at end of file + # download_file("60ff4cca-f97b-4311-be24-0eecd6970c01") + benchmark("20a1df34-e8ad-48fd-b455-c68294cc7f71") + # benchmark("9cee7966-7db1-4066-b326-f9e6f5e929e4") \ No newline at end of file diff --git a/frameioclient/client.py b/frameioclient/client.py index 29e411dc..0c6c39f2 100644 --- a/frameioclient/client.py +++ b/frameioclient/client.py @@ -401,7 +401,7 @@ def upload(self, asset, file): uploader = FrameioUploader(asset, file) uploader.upload() - def download(self, asset, download_folder): + def download(self, asset, download_folder, prefix=None, multi_part=False, concurrency=5): """ Download an asset. The method will exit once the file is downloaded. @@ -413,8 +413,8 @@ def download(self, asset, download_folder): client.download(asset, "~./Downloads") """ - downloader = FrameioDownloader(asset, download_folder) - downloader.download() + downloader = FrameioDownloader(asset, download_folder, prefix, multi_part, concurrency) + return downloader.download_handler() def get_comment(self, comment_id, **kwargs): """ diff --git a/frameioclient/download.py b/frameioclient/download.py index 8fdcc41e..3f331296 100644 --- a/frameioclient/download.py +++ b/frameioclient/download.py @@ -1,19 +1,62 @@ +import io import os import math +import time import requests +import threading +import concurrent.futures -from .exceptions import DownloadException +from .utils import format_bytes, normalize_filename +from .exceptions import DownloadException, WatermarkIDDownloadException, AssetNotFullyUploaded + +thread_local = threading.local() class FrameioDownloader(object): - def __init__(self, asset, download_folder): + def __init__(self, asset, download_folder, prefix, multi_part=False, concurrency=5): + self.multi_part = multi_part self.asset = asset + self.asset_type = None self.download_folder = download_folder + self.resolution_map = dict() + self.destination = None + self.watermarked = asset['is_session_watermarked'] # Default is probably false + self.file_size = asset["filesize"] + self.concurrency = concurrency + self.futures = list() + self.chunk_size = (25 * 1024 * 1024) # 25 MB chunk size + self.chunks = math.ceil(self.file_size/self.chunk_size) + self.prefix = prefix + self.filename = normalize_filename(asset["name"]) + + self._evaluate_asset() + + def _evaluate_asset(self): + if self.asset.get("_type") != "file": + raise DownloadException(message="Unsupport Asset type: {}".format(self.asset.get("_type"))) + + if self.asset.get("upload_completed_at") == None: + raise AssetNotFullyUploaded + + def _get_session(self): + if not hasattr(thread_local, "session"): + thread_local.session = requests.Session() + return thread_local.session + + def _create_file_stub(self): + try: + fp = open(self.destination, "wb") + fp.write(b"\0" * self.file_size) + fp.close() + except FileExistsError as e: + print(e) + raise e + return True def get_download_key(self): try: url = self.asset['original'] except KeyError as e: - if self.asset['is_session_watermarked'] == True: + if self.watermarked == True: resolution_list = list() try: for resolution_key, download_url in sorted(self.asset['downloads'].items()): @@ -31,21 +74,109 @@ def get_download_key(self): except KeyError: raise DownloadException else: - raise DownloadException + raise WatermarkIDDownloadException return url - def download(self): - original_filename = self.asset['name'] - final_destination = os.path.join(self.download_folder, original_filename) + def get_path(self): + if self.prefix != None: + self.filename = self.prefix + self.filename - print("Final destiation: {}".format(final_destination)) + if self.destination == None: + final_destination = os.path.join(self.download_folder, self.filename) + self.destination = final_destination + + return self.destination - if os.path.isfile(final_destination): - return final_destination + def download_handler(self): + if os.path.isfile(self.get_path()): + print("File already exists at this location.") + return self.destination else: url = self.get_download_key() - r = requests.get(url) - open(final_destination, 'wb').write(r.content) - return final_destination - \ No newline at end of file + + if self.watermarked == True: + return self.download(url) + else: + if self.multi_part == True: + return self.multi_part_download(url) + else: + return self.download(url) + + def download(self, url): + start_time = time.time() + print("Beginning download -- {} -- {}".format(self.asset["name"], format_bytes(self.file_size, type="size"))) + + # Downloading + r = requests.get(url) + open(self.destination, "wb").write(r.content) + + download_time = time.time() - start_time + download_speed = format_bytes(math.ceil(self.file_size/(download_time))) + print("Downloaded {} at {}".format(format_bytes(self.file_size, type="size"), download_speed)) + + return self.destination, download_speed + + def multi_part_download(self, url): + start_time = time.time() + + # Generate stub + try: + self._create_file_stub() + + except Exception as e: + raise DownloadException(message=e) + + offset = math.ceil(self.file_size / self.chunks) + in_byte = 0 # Set initially here, but then override + + print("Multi-part download -- {} -- {}".format(self.asset["name"], format_bytes(self.file_size, type="size"))) + + # Queue up threads + with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: + for i in range(int(self.chunks)): + out_byte = offset * (i+1) # Increment by the iterable + 1 so we don't mutiply by zero + task = (url, in_byte, out_byte, i) + + self.futures.append(executor.submit(self.download_chunk, task)) + in_byte = out_byte # Reset new in byte equal to last out byte + + # Wait on threads to finish + for future in concurrent.futures.as_completed(self.futures): + try: + status = future.result() + print(status) + except Exception as exc: + print(exc) + + # Calculate and print stats + download_time = time.time() - start_time + download_speed = format_bytes(math.ceil(self.file_size/(download_time))) + print("Downloaded {} at {}".format(format_bytes(self.file_size, type="size"), download_speed)) + + return self.destination + + def download_chunk(self, task): + # Download a particular chunk + # Called by the threadpool executor + + url = task[0] + start_byte = task[1] + end_byte = task[2] + chunk_number = task[3] + + session = self._get_session() + print("Getting chunk {}/{}".format(chunk_number + 1, self.chunks)) + + # Specify the starting and ending of the file + headers = {"Range": "bytes=%d-%d" % (start_byte, end_byte)} + + # Grab the data as a stream + r = session.get(url, headers=headers, stream=True) + + with open(self.destination, "r+b") as fp: + fp.seek(start_byte) # Seek to the right of the file + fp.write(r.content) # Write the data + print("Done writing chunk {}/{}".format(chunk_number + 1, self.chunks)) + + return "Complete!" diff --git a/frameioclient/exceptions.py b/frameioclient/exceptions.py index 7e1b831d..2005cec2 100644 --- a/frameioclient/exceptions.py +++ b/frameioclient/exceptions.py @@ -11,7 +11,7 @@ def __init__( self.message = message super().__init__(self.message) -class DownloadException(Exception): +class WatermarkIDDownloadException(Exception): """Exception raised when trying to download a file where there is no available download URL. """ @@ -20,4 +20,24 @@ def __init__( message="This file is unavailable for download due to security and permission settings." ): self.message = message - super().__init__(self.message) \ No newline at end of file + super().__init__(self.message) + +class DownloadException(Exception): + """Exception raised when trying to download a file + """ + def __init__( + self, + message="Generic Dowload exception." + ): + self.message = message + super().__init__(self.message) + +class AssetNotFullyUploaded(Exception): + """Exception raised when trying to download a file that isn't yet fully upload. + """ + def __init__( + self, + message="Unable to download this asset because it not yet fully uploaded." + ): + self.message = message + super().__init__(self.message) diff --git a/frameioclient/utils.py b/frameioclient/utils.py index c7aad855..17710a5d 100644 --- a/frameioclient/utils.py +++ b/frameioclient/utils.py @@ -1,5 +1,6 @@ import xxhash import sys +import re KB = 1024 MB = KB * KB @@ -24,20 +25,26 @@ def stream(func, page=1, page_size=20): page += 1 -def format_bytes(size): +def format_bytes(size, type="speed"): """ Convert bytes to KB/MB/GB/TB/s """ # 2**10 = 1024 power = 2**10 n = 0 - power_labels = {0 : 'B/s', 1: 'KB/s', 2: 'MB/s', 3: 'GB/s', 4: 'TB/s'} + power_labels = {0 : 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'} while size > power: size /= power n += 1 - return " ".join((str(round(size, 2)), power_labels[n])) + formatted = " ".join((str(round(size, 2)), power_labels[n])) + + if type == "speed": + return formatted + "/s" + + elif type == "size": + return formatted def calculate_hash(file_path): """ @@ -73,4 +80,32 @@ def compare_items(dict1, dict2): if comparison == False: print("File mismatch between upload and download") - return comparison \ No newline at end of file + return comparison + +def get_valid_filename(s): + """ + Strip out invalid characters from a filename using regex + """ + s = str(s).strip().replace(' ', '_') + return re.sub(r'(?u)[^-\w.]', '', s) + +def normalize_filename(fn): + """ + Normalize filename using pure python + """ + validchars = "-_.() " + out = "" + + if isinstance(fn, str): + pass + elif isinstance(fn, unicode): + fn = str(fn.decode('utf-8', 'ignore')) + else: + pass + + for c in fn: + if str.isalpha(c) or str.isdigit(c) or (c in validchars): + out += c + else: + out += "_" + return out diff --git a/tests/integration.py b/tests/integration.py index cb6c9b6a..5e917005 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -107,7 +107,7 @@ def test_download(client, override=False): start_time = time.time() print("{}/{} Beginning to download: {}".format(count, len(asset_list), asset['name'])) - client.download(asset, 'downloads') + client.download(asset, 'downloads', multi_part=True, concurrency=20) download_time = time.time() - start_time download_speed = format_bytes(ceil(asset['filesize']/(download_time)))