From 9e92898cf3304d864727a75835988c91b6c1e7e8 Mon Sep 17 00:00:00 2001 From: rvalerio Date: Fri, 22 Aug 2025 11:52:39 +0100 Subject: [PATCH] Allow reusing single task files in remote assets --- task-runner/task_runner/api_client.py | 31 +++++++ task-runner/task_runner/file_manager.py | 103 ++++++++++++++++++++---- 2 files changed, 117 insertions(+), 17 deletions(-) diff --git a/task-runner/task_runner/api_client.py b/task-runner/task_runner/api_client.py index c5fe7580..1ef0601f 100644 --- a/task-runner/task_runner/api_client.py +++ b/task-runner/task_runner/api_client.py @@ -260,6 +260,37 @@ def get_signed_urls( ) return resp.json() + def get_zip_file_range( + self, + filename: str, + path: str, + zip_relative_path: Optional[str] = None, + ): + """ + Get the byte range (start and end) of a file inside a ZIP archive. + + :param filename: The file inside the ZIP. + :param path: Full path of the ZIP file in storage. + :param zip_relative_path: Relative path inside the zip. + """ + params = {"filename": filename, "path": path} + if zip_relative_path is not None: + params["zip_relative_path"] = zip_relative_path + + resp = self._request( + method="GET", + path="/storage/zip-file-range", + raise_exception=True, + attempts=HTTP_REQUEST_MAX_ATTEMPTS, + params=params, + ) + data = resp.json() + return ( + data["range_start"], + data["range_end"], + data["compress_type"], + ) + def get_download_input_url(self, storage_dir: str) -> str: return self.get_signed_urls( paths=[f"{storage_dir}/{INPUT_ZIP_FILENAME}"], diff --git a/task-runner/task_runner/file_manager.py b/task-runner/task_runner/file_manager.py index a95b9b9a..66a6dd8a 100644 --- a/task-runner/task_runner/file_manager.py +++ b/task-runner/task_runner/file_manager.py @@ -5,9 +5,11 @@ import urllib import urllib.request import uuid +import zlib import requests import tenacity +import urllib3 from typing_extensions import override import task_runner @@ -194,6 +196,37 @@ def upload_output( return size, zip_duration, upload_time + @staticmethod + def _download_file(url, + download_path, + pool_manager, + range_start=None, + range_end=None): + headers = {} + is_range = range_start is not None and range_end is not None + + if is_range: + headers["Range"] = f"bytes={range_start}-{range_end}" + response = pool_manager.urlopen("GET", + url, + headers=headers, + preload_content=False) + + compressed_path = download_path + ".tmp" + os.makedirs(os.path.dirname(compressed_path), exist_ok=True) + with open(compressed_path, "wb") as file: + for chunk in response.stream(): + file.write(chunk) + response.release_conn() + + with open(compressed_path, "rb") as f: + compressed_data = f.read() + decompressed_data = zlib.decompress(compressed_data, -zlib.MAX_WBITS) + + with open(download_path, "wb") as f: + f.write(decompressed_data) + os.remove(compressed_path) + @utils.execution_time @override def download_input_resources( @@ -202,21 +235,57 @@ def download_input_resources( dest_path: str, workdir: str, ): - files_url = self._api_client.get_download_urls(input_resources) - - for file_url in files_url: - url = file_url["url"] - base_path = file_url["file_path"] - unzip = file_url["unzip"] - file_path = os.path.join(dest_path, base_path) - os.makedirs(os.path.dirname(file_path), exist_ok=True) - urllib.request.urlretrieve(url, file_path) - - if unzip: - extract_to = os.path.join(dest_path, os.path.dirname(file_path)) - files.extract_subfolder_and_cleanup( - zip_path=file_path, - subfolder="artifacts/", - extract_to=extract_to, - workdir=workdir, + zip_paths = [] + normal_paths = [] + + for input_resource in input_resources: + normalized = input_resource.rstrip("/") + + if ".zip/" in normalized or normalized.endswith(".zip"): + zip_file, inner_path = normalized.split(".zip/", 1) + zip_paths.append({ + "zip_file": zip_file + ".zip", + "inner_path": inner_path + }) + else: + normal_paths.append(normalized) + + if zip_paths: + zip_urls = self._api_client.get_download_urls( + [z["zip_file"] for z in zip_paths]) + + pool_manager = urllib3.PoolManager() + for zip_path, zip_url in zip(zip_paths, zip_urls): + range_start, range_end, _ = self._api_client.get_zip_file_range( + filename=zip_path["inner_path"], + path=zip_path["zip_file"], + zip_relative_path="artifacts/", ) + self._download_file( + url=zip_url["url"], + download_path=os.path.join(dest_path, + zip_path["inner_path"]), + pool_manager=pool_manager, + range_start=range_start, + range_end=range_end, + ) + + if normal_paths: + files_url = self._api_client.get_download_urls(normal_paths) + for file_url in files_url: + url = file_url["url"] + base_path = file_url["file_path"] + unzip = file_url["unzip"] + file_path = os.path.join(dest_path, base_path) + os.makedirs(os.path.dirname(file_path), exist_ok=True) + urllib.request.urlretrieve(url, file_path) + + if unzip: + extract_to = os.path.join(dest_path, + os.path.dirname(file_path)) + files.extract_subfolder_and_cleanup( + zip_path=file_path, + subfolder="artifacts/", + extract_to=extract_to, + workdir=workdir, + )