Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ jobs:
env:
PORT: 8080
steps:
- name: "Checkout"
uses: "actions/checkout@v4"
- name: 'Checkout'
uses: 'actions/checkout@v4'
- name: Run Dataverse Action
id: dataverse
uses: gdcc/dataverse-action@main
Expand All @@ -38,6 +38,7 @@ jobs:
env:
API_TOKEN: ${{ steps.dataverse.outputs.api_token }}
BASE_URL: ${{ steps.dataverse.outputs.base_url }}
DVUPLOADER_TESTING: "true"
DVUPLOADER_TESTING: 'true'
TEST_ROWS: 100000
run: |
python3 -m poetry run pytest
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Alternatively, you can also supply a `config` file that contains all necessary i
* `mimetype`: Mimetype of the file.
* `categories`: Optional list of categories to assign to the file.
* `restrict`: Boolean to indicate that this is a restricted file. Defaults to False.
* `tabIngest`: Boolean to indicate that the file should be ingested as a tab-separated file. Defaults to True.

In the following example, we upload three files to a Dataverse instance. The first file is uploaded to the root directory of the dataset, while the other two files are uploaded to the directory `some/dir`.

Expand All @@ -115,6 +116,61 @@ The `config` file can then be used as follows:
dvuploader --config-path config.yml
```

### Environment variables

DVUploader provides several environment variables that allow you to control retry logic and upload size limits. These can be set either through environment variables directly or programmatically using the `config` function.

**Available Environment Variables:**
- `DVUPLOADER_MAX_RETRIES`: Maximum number of retry attempts (default: 15)
- `DVUPLOADER_MAX_RETRY_TIME`: Maximum wait time between retries in seconds (default: 240)
- `DVUPLOADER_MIN_RETRY_TIME`: Minimum wait time between retries in seconds (default: 1)
- `DVUPLOADER_RETRY_MULTIPLIER`: Multiplier for exponential backoff (default: 0.1)
- `DVUPLOADER_MAX_PKG_SIZE`: Maximum package size in bytes (default: 2GB)

**Setting via environment:**
```bash
export DVUPLOADER_MAX_RETRIES=20
export DVUPLOADER_MAX_RETRY_TIME=300
export DVUPLOADER_MIN_RETRY_TIME=2
export DVUPLOADER_RETRY_MULTIPLIER=0.2
export DVUPLOADER_MAX_PKG_SIZE=3221225472 # 3GB
```

**Setting programmatically:**
```python
import dvuploader as dv

# Configure the uploader settings
dv.config(
max_retries=20,
max_retry_time=300,
min_retry_time=2,
retry_multiplier=0.2,
max_package_size=3 * 1024**3 # 3GB
)

# Continue with your upload as normal
files = [dv.File(filepath="./data.csv")]
dvuploader = dv.DVUploader(files=files)
# ... rest of your upload code
```

The retry logic uses exponential backoff which ensures that subsequent retries will be longer, but won't exceed exceed `max_retry_time`. This is particularly useful when dealing with native uploads that may be subject to intermediate locks on the Dataverse side.

## Troubleshooting

#### `500` error and `OptimisticLockException`

When uploading multiple tabular files, you might encounter a `500` error and a `OptimisticLockException` upon the file registration step. This has been discussed in https://github.com/IQSS/dataverse/issues/11265 and is due to the fact that intermediate locks prevent the file registration step from completing.

A workaround is to set the `tabIngest` flag to `False` for all files that are to be uploaded. This will cause the files not be ingested but will avoid the intermediate locks.

```python
dv.File(filepath="tab_file.csv", tab_ingest=False)
```

Please be aware that your tabular files will not be ingested as such but will be uploaded in their native format. You can utilize [pyDataverse](https://github.com/gdcc/pyDataverse/blob/693d0ff8d2849eccc32f9e66228ee8976109881a/pyDataverse/api.py#L2475) to ingest the files after they have been uploaded.

## Development

To install the development dependencies, run the following command:
Expand Down
1 change: 1 addition & 0 deletions dvuploader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .dvuploader import DVUploader # noqa: F401
from .file import File # noqa: F401
from .utils import add_directory # noqa: F401
from .config import config # noqa: F401

import nest_asyncio

Expand Down
56 changes: 56 additions & 0 deletions dvuploader/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os


def config(
max_retries: int = 15,
max_retry_time: int = 240,
min_retry_time: int = 1,
retry_multiplier: float = 0.1,
max_package_size: int = 2 * 1024**3,
):
"""This function sets the environment variables for the dvuploader package.

Use this function to set the environment variables for the dvuploader package,
which controls the behavior of the package. This is particularly useful when
you want to be more loose on the handling of the retry logic and upload size.

Retry logic:
Native uploads in particular may be subject to intermediate locks
on the Dataverse side, which may cause the upload to fail. We provide
and exponential backoff mechanism to deal with this.

The exponential backoff is controlled by the following environment variables:
- DVUPLOADER_MAX_RETRIES: The maximum number of retries.
- DVUPLOADER_MAX_RETRY_TIME: The maximum retry time.
- DVUPLOADER_MIN_RETRY_TIME: The minimum retry time.
- DVUPLOADER_RETRY_MULTIPLIER: The retry multiplier.

The recursive formula for the wait time is:
wait_time = min_retry_time * retry_multiplier^n
where n is the number of retries.

The wait time will not exceed max_retry_time.

Upload size:
The maximum upload size is controlled by the following environment variable:
- DVUPLOADER_MAX_PKG_SIZE: The maximum package size.

The default maximum package size is 2GB, but this can be changed by
setting the DVUPLOADER_MAX_PKG_SIZE environment variable.

We recommend not to exceed 2GB, as this is the maximum size supported
by Dataverse and beyond that the risk of failure increases.

Args:
max_retries (int): The maximum number of retries.
max_retry_time (int): The maximum retry time.
min_retry_time (int): The minimum retry time.
retry_multiplier (float): The retry multiplier.
max_package_size (int): The maximum package size.
"""

os.environ["DVUPLOADER_MAX_RETRIES"] = str(max_retries)
os.environ["DVUPLOADER_MAX_RETRY_TIME"] = str(max_retry_time)
os.environ["DVUPLOADER_MIN_RETRY_TIME"] = str(min_retry_time)
os.environ["DVUPLOADER_RETRY_MULTIPLIER"] = str(retry_multiplier)
os.environ["DVUPLOADER_MAX_PKG_SIZE"] = str(max_package_size)
25 changes: 14 additions & 11 deletions dvuploader/directupload.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ async def _upload_multipart(
)

file.apply_checksum()
print(file.checksum)

return True, storage_identifier

Expand Down Expand Up @@ -556,17 +555,21 @@ async def _add_files_to_ds(
novel_json_data = _prepare_registration(files, use_replace=False)
replace_json_data = _prepare_registration(files, use_replace=True)

await _multipart_json_data_request(
session=session,
json_data=novel_json_data,
url=novel_url,
)
if novel_json_data:
# Register new files, if any
await _multipart_json_data_request(
session=session,
json_data=novel_json_data,
url=novel_url,
)

await _multipart_json_data_request(
session=session,
json_data=replace_json_data,
url=replace_url,
)
if replace_json_data:
# Register replacement files, if any
await _multipart_json_data_request(
session=session,
json_data=replace_json_data,
url=replace_url,
)

progress.update(pbar, advance=1)

Expand Down
3 changes: 2 additions & 1 deletion dvuploader/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class File(BaseModel):
handler: Union[BytesIO, StringIO, IO, None] = Field(default=None, exclude=True)
description: str = ""
directory_label: str = Field(default="", alias="directoryLabel")
mimeType: str = "text/plain"
mimeType: str = "application/octet-stream"
categories: List[str] = ["DATA"]
restrict: bool = False
checksum_type: ChecksumTypes = Field(default=ChecksumTypes.MD5, exclude=True)
Expand All @@ -54,6 +54,7 @@ class File(BaseModel):
checksum: Optional[Checksum] = None
to_replace: bool = False
file_id: Optional[Union[str, int]] = Field(default=None, alias="fileToReplaceId")
tab_ingest: bool = Field(default=True, alias="tabIngest")

_size: int = PrivateAttr(default=0)

Expand Down
89 changes: 81 additions & 8 deletions dvuploader/nativeupload.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
from io import BytesIO
from pathlib import Path
import httpx
import json
import os
import tempfile
import rich
import tenacity
from typing import List, Optional, Tuple, Dict

Expand All @@ -13,12 +15,44 @@
from dvuploader.packaging import distribute_files, zip_files
from dvuploader.utils import build_url, retrieve_dataset_files

##### CONFIGURATION #####

# Based on MAX_RETRIES, we will wait between 0.3 and 120 seconds between retries:
# Exponential recursion: 0.1 * 2^n
#
# This will exponentially increase the wait time between retries.
# The max wait time is 240 seconds per retry though.
MAX_RETRIES = int(os.environ.get("DVUPLOADER_MAX_RETRIES", 15))
MAX_RETRY_TIME = int(os.environ.get("DVUPLOADER_MAX_RETRY_TIME", 60))
MIN_RETRY_TIME = int(os.environ.get("DVUPLOADER_MIN_RETRY_TIME", 1))
RETRY_MULTIPLIER = float(os.environ.get("DVUPLOADER_RETRY_MULTIPLIER", 0.1))
RETRY_STRAT = tenacity.wait_exponential(
multiplier=RETRY_MULTIPLIER,
min=MIN_RETRY_TIME,
max=MAX_RETRY_TIME,
)

assert isinstance(MAX_RETRIES, int), "DVUPLOADER_MAX_RETRIES must be an integer"
assert isinstance(MAX_RETRY_TIME, int), "DVUPLOADER_MAX_RETRY_TIME must be an integer"
assert isinstance(MIN_RETRY_TIME, int), "DVUPLOADER_MIN_RETRY_TIME must be an integer"
assert isinstance(RETRY_MULTIPLIER, float), (
"DVUPLOADER_RETRY_MULTIPLIER must be a float"
)

##### END CONFIGURATION #####

NATIVE_UPLOAD_ENDPOINT = "/api/datasets/:persistentId/add"
NATIVE_REPLACE_ENDPOINT = "/api/files/{FILE_ID}/replace"
NATIVE_METADATA_ENDPOINT = "/api/files/{FILE_ID}/metadata"

assert isinstance(MAX_RETRIES, int), "DVUPLOADER_MAX_RETRIES must be an integer"
TABULAR_EXTENSIONS = [
"csv",
"tsv",
]

##### ERROR MESSAGES #####

ZIP_LIMIT_MESSAGE = "The number of files in the zip archive is over the limit"


async def native_upload(
Expand Down Expand Up @@ -174,8 +208,9 @@ def _reset_progress(


@tenacity.retry(
wait=tenacity.wait_fixed(0.5),
wait=RETRY_STRAT,
stop=tenacity.stop_after_attempt(MAX_RETRIES),
retry=tenacity.retry_if_exception_type((httpx.HTTPStatusError,)),
)
async def _single_native_upload(
session: httpx.AsyncClient,
Expand Down Expand Up @@ -213,7 +248,11 @@ async def _single_native_upload(
json_data = _get_json_data(file)

files = {
"file": (file.file_name, file.handler, file.mimeType),
"file": (
file.file_name,
file.handler,
file.mimeType,
),
"jsonData": (
None,
BytesIO(json.dumps(json_data).encode()),
Expand All @@ -226,9 +265,20 @@ async def _single_native_upload(
files=files, # type: ignore
)

if response.status_code == 400 and response.json()["message"].startswith(
ZIP_LIMIT_MESSAGE
):
# Explicitly handle the zip limit error, because otherwise we will run into
# unnecessary retries.
raise ValueError(
f"Could not upload file '{file.file_name}' due to zip limit:\n{response.json()['message']}"
)

# Any other error is re-raised and the error will be handled by the retry logic.
response.raise_for_status()

if response.status_code == 200:
# If we did well, update the progress bar.
progress.update(pbar, advance=file._size, complete=file._size)

# Wait to avoid rate limiting
Expand All @@ -238,7 +288,6 @@ async def _single_native_upload(

# Wait to avoid rate limiting
await asyncio.sleep(1.0)

return False, {"message": "Failed to upload file"}


Expand Down Expand Up @@ -294,14 +343,23 @@ async def _update_metadata(
dv_path = os.path.join(file.directory_label, file.file_name) # type: ignore

try:
file_id = file_mapping[dv_path]
if _tab_extension(dv_path) in file_mapping:
file_id = file_mapping[_tab_extension(dv_path)]
elif file.file_name and _is_zip(file.file_name):
# When the file is a zip it will be unpacked and thus
# the expected file name of the zip will not be in the
# dataset, since it has been unpacked.
continue
else:
file_id = file_mapping[dv_path]
except KeyError:
raise ValueError(
rich.print(
(
f"File {dv_path} not found in Dataverse repository.",
"This may be due to the file not being uploaded to the repository.",
"This may be due to the file not being uploaded to the repository:",
)
)
continue

task = _update_single_metadata(
session=session,
Expand All @@ -315,7 +373,7 @@ async def _update_metadata(


@tenacity.retry(
wait=tenacity.wait_fixed(0.3),
wait=RETRY_STRAT,
stop=tenacity.stop_after_attempt(MAX_RETRIES),
)
async def _update_single_metadata(
Expand Down Expand Up @@ -356,6 +414,7 @@ async def _update_single_metadata(
if response.status_code == 200:
return
else:
print(response.json())
await asyncio.sleep(1.0)

raise ValueError(f"Failed to update metadata for file {file.file_name}.")
Expand Down Expand Up @@ -407,3 +466,17 @@ def _create_file_id_path_mapping(files):
mapping[path] = file["id"]

return mapping


def _tab_extension(path: str) -> str:
"""
Adds a tabular extension to the path if it is not already present.
"""
return str(Path(path).with_suffix(".tab"))


def _is_zip(file_name: str) -> bool:
"""
Checks if a file name ends with a zip extension.
"""
return file_name.endswith(".zip")
Loading