Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions task-runner/task_runner/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,37 @@ def get_signed_urls(
)
return resp.json()

def get_zip_file_range(
self,
filename: str,
path: str,
zip_relative_path: Optional[str] = None,
):
"""
Get the byte range (start and end) of a file inside a ZIP archive.

:param filename: The file inside the ZIP.
:param path: Full path of the ZIP file in storage.
:param zip_relative_path: Relative path inside the zip.
"""
params = {"filename": filename, "path": path}
if zip_relative_path is not None:
params["zip_relative_path"] = zip_relative_path

resp = self._request(
method="GET",
path="/storage/zip-file-range",
raise_exception=True,
attempts=HTTP_REQUEST_MAX_ATTEMPTS,
params=params,
)
data = resp.json()
return (
data["range_start"],
data["range_end"],
data["compress_type"],
)

def get_download_input_url(self, storage_dir: str) -> str:
return self.get_signed_urls(
paths=[f"{storage_dir}/{INPUT_ZIP_FILENAME}"],
Expand Down
103 changes: 86 additions & 17 deletions task-runner/task_runner/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import urllib
import urllib.request
import uuid
import zlib

import requests
import tenacity
import urllib3
from typing_extensions import override

import task_runner
Expand Down Expand Up @@ -194,6 +196,37 @@ def upload_output(

return size, zip_duration, upload_time

@staticmethod
def _download_file(url,
download_path,
pool_manager,
range_start=None,
range_end=None):
headers = {}
is_range = range_start is not None and range_end is not None

if is_range:
headers["Range"] = f"bytes={range_start}-{range_end}"
response = pool_manager.urlopen("GET",
url,
headers=headers,
preload_content=False)

compressed_path = download_path + ".tmp"
os.makedirs(os.path.dirname(compressed_path), exist_ok=True)
with open(compressed_path, "wb") as file:
for chunk in response.stream():
file.write(chunk)
response.release_conn()

with open(compressed_path, "rb") as f:
compressed_data = f.read()
decompressed_data = zlib.decompress(compressed_data, -zlib.MAX_WBITS)

with open(download_path, "wb") as f:
f.write(decompressed_data)
os.remove(compressed_path)

@utils.execution_time
@override
def download_input_resources(
Expand All @@ -202,21 +235,57 @@ def download_input_resources(
dest_path: str,
workdir: str,
):
files_url = self._api_client.get_download_urls(input_resources)

for file_url in files_url:
url = file_url["url"]
base_path = file_url["file_path"]
unzip = file_url["unzip"]
file_path = os.path.join(dest_path, base_path)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
urllib.request.urlretrieve(url, file_path)

if unzip:
extract_to = os.path.join(dest_path, os.path.dirname(file_path))
files.extract_subfolder_and_cleanup(
zip_path=file_path,
subfolder="artifacts/",
extract_to=extract_to,
workdir=workdir,
zip_paths = []
normal_paths = []

for input_resource in input_resources:
normalized = input_resource.rstrip("/")

if ".zip/" in normalized or normalized.endswith(".zip"):
zip_file, inner_path = normalized.split(".zip/", 1)
zip_paths.append({
"zip_file": zip_file + ".zip",
"inner_path": inner_path
})
else:
normal_paths.append(normalized)

if zip_paths:
zip_urls = self._api_client.get_download_urls(
[z["zip_file"] for z in zip_paths])

pool_manager = urllib3.PoolManager()
for zip_path, zip_url in zip(zip_paths, zip_urls):
range_start, range_end, _ = self._api_client.get_zip_file_range(
filename=zip_path["inner_path"],
path=zip_path["zip_file"],
zip_relative_path="artifacts/",
)
self._download_file(
url=zip_url["url"],
download_path=os.path.join(dest_path,
zip_path["inner_path"]),
pool_manager=pool_manager,
range_start=range_start,
range_end=range_end,
)

if normal_paths:
files_url = self._api_client.get_download_urls(normal_paths)
for file_url in files_url:
url = file_url["url"]
base_path = file_url["file_path"]
unzip = file_url["unzip"]
file_path = os.path.join(dest_path, base_path)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
urllib.request.urlretrieve(url, file_path)

if unzip:
extract_to = os.path.join(dest_path,
os.path.dirname(file_path))
files.extract_subfolder_and_cleanup(
zip_path=file_path,
subfolder="artifacts/",
extract_to=extract_to,
workdir=workdir,
)