-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfritsander_api_knmi.py
More file actions
183 lines (147 loc) · 6.1 KB
/
fritsander_api_knmi.py
File metadata and controls
183 lines (147 loc) · 6.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import asyncio
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any
import requests
from requests import Session
# DOESNT WORK
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO))
def download_dataset_file(
session: Session,
base_url: str,
dataset_name: str,
dataset_version: str,
filename: str,
directory: str,
overwrite: bool,
) -> tuple[bool, str]:
# if a file from this dataset already exists, skip downloading it.
file_path = Path(directory, filename).resolve()
if not overwrite and file_path.exists():
logger.info(f"Dataset file '{filename}' was already downloaded.")
return True, filename
endpoint = f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url"
get_file_response = session.get(endpoint)
# retrieve download URL for dataset file
if get_file_response.status_code != 200:
logger.warning(f"Unable to get file: {filename}")
logger.warning(get_file_response.content)
return False, filename
# use download URL to GET dataset file. We don't need to set the 'Authorization' header,
# The presigned download URL already has permissions to GET the file contents
download_url = get_file_response.json().get("temporaryDownloadUrl")
return download_file_from_temporary_download_url(download_url, directory, filename)
def download_file_from_temporary_download_url(download_url, directory, filename):
try:
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
with open(f"{directory}/{filename}", "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except Exception:
logger.exception("Unable to download file using download URL")
return False, filename
logger.info(f"Downloaded dataset file '{filename}'")
return True, filename
def list_dataset_files(
session: Session,
base_url: str,
dataset_name: str,
dataset_version: str,
params: dict[str, str],
) -> tuple[list[str], dict[str, Any]]:
logger.info(f"Retrieve dataset files with query params: {params}")
list_files_endpoint = f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files"
list_files_response = session.get(list_files_endpoint, params=params)
print (list_files_response.status_code)
print (list_files_endpoint)
if list_files_response.status_code != 200:
raise Exception("Unable to list initial dataset files")
try:
list_files_response_json = list_files_response.json()
dataset_files = list_files_response_json.get("files")
dataset_filenames = list(map(lambda x: x.get("filename"), dataset_files))
return dataset_filenames, list_files_response_json
except Exception as e:
logger.exception(e)
raise Exception(e)
def get_max_worker_count(filesizes):
size_for_threading = 10_000_000 # 10 MB
average = sum(filesizes) / len(filesizes)
# to prevent downloading multiple half files in case of a network failure with big files
if average > size_for_threading:
threads = 1
else:
threads = 10
return threads
async def main():
api_key = "#####"
dataset_name = "KNMI_precipsum24h_unvalidated"
dataset_version = "2"
base_url = "https://api.dataplatform.knmi.nl/open-data/v1"
# When set to True, if a file with the same name exists the output is written over the file.
# To prevent unnecessary bandwidth usage, leave it set to False.
overwrite = False
download_directory = "dataset-download"
# Make sure to send the API key with every HTTP request
session = requests.Session()
session.headers.update({"Authorization": api_key})
# Verify that the download directory exists
if not Path(download_directory).is_dir() or not Path(download_directory).exists():
raise Exception(f"Invalid or non-existing directory: {download_directory}")
filenames = []
max_keys = 500
next_page_token = None
file_sizes = []
# Use the API to get a list of all dataset filenames
while True:
# Retrieve dataset files after given filename
dataset_filenames, response_json = list_dataset_files(
session,
base_url,
dataset_name,
dataset_version,
{"maxKeys": f"{max_keys}", "nextPageToken": next_page_token},
)
file_sizes.extend(file["size"] for file in response_json.get("files"))
# Store filenames
filenames += dataset_filenames
# If the result is not truncated, we retrieved all filenames
next_page_token = response_json.get("nextPageToken")
if not next_page_token:
logger.info("Retrieved names of all dataset files")
break
logger.info(f"Number of files to download: {len(filenames)}")
worker_count = get_max_worker_count(file_sizes)
loop = asyncio.get_event_loop()
# Allow up to 10 separate threads to download dataset files concurrently
executor = ThreadPoolExecutor(max_workers=worker_count)
futures = []
# Create tasks that download the dataset files
for dataset_filename in filenames:
# Create future for dataset file
future = loop.run_in_executor(
executor,
download_dataset_file,
session,
base_url,
dataset_name,
dataset_version,
dataset_filename,
download_directory,
overwrite,
)
futures.append(future)
# # Wait for all tasks to complete and gather the results
future_results = await asyncio.gather(*futures)
logger.info(f"Finished '{dataset_name}' dataset download")
failed_downloads = list(filter(lambda x: not x[0], future_results))
if len(failed_downloads) > 0:
logger.warning("Failed to download the following dataset files:")
logger.warning(list(map(lambda x: x[1], failed_downloads)))
if __name__ == "__main__":
asyncio.run(main())