Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import dvuploader as dv
# Add file individually
files = [
dv.File(filepath="./small.txt"),
dv.File(directoryLabel="some/dir", filepath="./medium.txt"),
dv.File(directoryLabel="some/dir", filepath="./big.txt"),
dv.File(directory_label="some/dir", filepath="./medium.txt"),
dv.File(directory_label="some/dir", filepath="./big.txt"),
*dv.add_directory("./data"), # Add an entire directory
]

Expand Down Expand Up @@ -88,7 +88,7 @@ Alternatively, you can also supply a `config` file that contains all necessary i
* `api_token`: API token of the Dataverse instance.
* `files`: List of files to upload. Each file is a dictionary with the following keys:
* `filepath`: Path to the file to upload.
* `directoryLabel`: Optional directory label to upload the file to.
* `directory_label`: Optional directory label to upload the file to.
* `description`: Optional description of the file.
* `mimetype`: Mimetype of the file.
* `categories`: Optional list of categories to assign to the file.
Expand All @@ -104,9 +104,9 @@ api_token: XXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
files:
- filepath: ./small.txt
- filepath: ./medium.txt
directoryLabel: some/dir
directory_label: some/dir
- filepath: ./big.txt
directoryLabel: some/dir
directory_label: some/dir
```

The `config` file can then be used as follows:
Expand Down
37 changes: 25 additions & 12 deletions dvuploader/checksum.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import hashlib
from enum import Enum
import os
from typing import Callable
from typing import IO, Callable

from pydantic import BaseModel, ConfigDict, Field

Expand Down Expand Up @@ -34,34 +34,41 @@ class Checksum(BaseModel):
value (str): The value of the checksum.
"""

model_config = ConfigDict(populate_by_name=True)
model_config = ConfigDict(
arbitrary_types_allowed=True,
populate_by_name=True,
)

type: str = Field(..., alias="@type")
value: str = Field(..., alias="@value")

@classmethod
def from_file(
cls,
fpath: str,
handler: IO,
hash_fun: Callable,
hash_algo: str,
) -> "Checksum":
"""Takes a file path and returns a checksum object.

Args:
fpath (str): The file path to generate the checksum for.
handler (IO): The file handler to generate the checksum for.
hash_fun (Callable): The hash function to use for generating the checksum.
hash_algo (str): The hash algorithm to use for generating the checksum.

Returns:
Checksum: A Checksum object with type and value fields.
"""

value = cls._chunk_checksum(fpath=fpath, hash_fun=hash_fun)
value = cls._chunk_checksum(handler=handler, hash_fun=hash_fun)
return cls(type=hash_algo, value=value) # type: ignore

@staticmethod
def _chunk_checksum(fpath: str, hash_fun: Callable, blocksize=2**20) -> str:
def _chunk_checksum(
handler: IO,
hash_fun: Callable,
blocksize=2**20
) -> str:
"""Chunks a file and returns a checksum.

Args:
Expand All @@ -73,10 +80,16 @@ def _chunk_checksum(fpath: str, hash_fun: Callable, blocksize=2**20) -> str:
str: A string representing the checksum of the file.
"""
m = hash_fun()
with open(fpath, "rb") as f:
while True:
buf = f.read(blocksize)
if not buf:
break
m.update(buf)
while True:
buf = handler.read(blocksize)

if not isinstance(buf, bytes):
buf = buf.encode()

if not buf:
break
m.update(buf)

handler.seek(0)

return m.hexdigest()
58 changes: 34 additions & 24 deletions dvuploader/directupload.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def direct_upload(
if status is True:
continue

print(f"❌ Failed to upload file '{file.fileName}' to the S3 storage")
print(f"❌ Failed to upload file '{file.file_name}' to the S3 storage")

headers = {
"X-Dataverse-key": api_token,
Expand Down Expand Up @@ -129,23 +129,19 @@ async def _upload_to_store(

await asyncio.sleep(delay)

assert file.fileName is not None, "File name is None"
assert os.path.exists(file.filepath), f"File {file.filepath} does not exist"

file_size = os.path.getsize(file.filepath)
ticket = await _request_ticket(
session=session,
dataverse_url=dataverse_url,
api_token=api_token,
file_size=file_size,
file_size=file._size,
persistent_id=persistent_id,
)

if not "urls" in ticket:
status, storage_identifier = await _upload_singlepart(
session=session,
ticket=ticket,
filepath=file.filepath,
file=file,
pbar=pbar,
progress=progress,
api_token=api_token,
Expand All @@ -156,7 +152,7 @@ async def _upload_to_store(
status, storage_identifier = await _upload_multipart(
session=session,
response=ticket,
filepath=file.filepath,
file=file,
dataverse_url=dataverse_url,
pbar=pbar,
progress=progress,
Expand Down Expand Up @@ -207,7 +203,7 @@ async def _request_ticket(
async def _upload_singlepart(
session: aiohttp.ClientSession,
ticket: Dict,
filepath: str,
file: File,
pbar,
progress,
api_token: str,
Expand Down Expand Up @@ -241,21 +237,16 @@ async def _upload_singlepart(
params = {
"headers": headers,
"url": ticket["url"],
"data": open(filepath, "rb"),
"data": file.handler,
}

async with session.put(**params) as response:
status = response.status == 200
response.raise_for_status()

if status:
progress.update(
pbar,
advance=os.path.getsize(filepath),
)

progress.update(pbar, advance=file._size)
await asyncio.sleep(0.1)

progress.update(
pbar,
visible=leave_bar,
Expand All @@ -267,7 +258,7 @@ async def _upload_singlepart(
async def _upload_multipart(
session: aiohttp.ClientSession,
response: Dict,
filepath: str,
file: File,
dataverse_url: str,
pbar,
progress,
Expand All @@ -279,7 +270,7 @@ async def _upload_multipart(
Args:
session (aiohttp.ClientSession): The aiohttp client session.
response (Dict): The response from the Dataverse API containing the upload ticket information.
filepath (str): The path to the file to be uploaded.
file (File): The file object to be uploaded.
dataverse_url (str): The URL of the Dataverse instance.
pbar (tqdm): A progress bar to track the upload progress.
progress: The progress callback function.
Expand All @@ -301,19 +292,20 @@ async def _upload_multipart(

try:
e_tags = await _chunked_upload(
filepath=filepath,
file=file,
session=session,
urls=urls,
chunk_size=chunk_size,
pbar=pbar,
progress=progress,
)
except Exception as e:
print(f"❌ Failed to upload file '{filepath}' to the S3 storage")
print(f"❌ Failed to upload file '{file.file_name}' to the S3 storage")
await _abort_upload(
session=session,
url=abort,
dataverse_url=dataverse_url,
api_token=api_token,
)
raise e

Expand All @@ -329,7 +321,7 @@ async def _upload_multipart(


async def _chunked_upload(
filepath: str,
file: File,
session: aiohttp.ClientSession,
urls,
chunk_size: int,
Expand All @@ -340,7 +332,7 @@ async def _chunked_upload(
Uploads a file in chunks to multiple URLs using the provided session.

Args:
filepath (str): The path of the file to upload.
file (File): The file object to upload.
session (aiohttp.ClientSession): The aiohttp client session to use for the upload.
urls: An iterable of URLs to upload the file chunks to.
chunk_size (int): The size of each chunk in bytes.
Expand All @@ -351,7 +343,17 @@ async def _chunked_upload(
List[str]: A list of ETags returned by the server for each uploaded chunk.
"""
e_tags = []
async with aiofiles.open(filepath, "rb") as f:

if not os.path.exists(file.filepath):
raise NotImplementedError(
"""

Multipart chunked upload is currently only supported for local files and no in-memory objects.
Please save the handlers content to a local file and try again.
"""
)

async with aiofiles.open(file.filepath, "rb") as f:
chunk = await f.read(chunk_size)
e_tags.append(
await _upload_chunk(
Expand Down Expand Up @@ -459,6 +461,7 @@ async def _abort_upload(
session: aiohttp.ClientSession,
url: str,
dataverse_url: str,
api_token: str,
):
"""
Aborts an ongoing upload by sending a DELETE request to the specified URL.
Expand All @@ -467,11 +470,17 @@ async def _abort_upload(
session (aiohttp.ClientSession): The aiohttp client session.
url (str): The URL to send the DELETE request to.
dataverse_url (str): The base URL of the Dataverse instance.
api_token (str): The API token to use for the request.

Raises:
aiohttp.ClientResponseError: If the DELETE request fails.
"""
async with session.delete(urljoin(dataverse_url, url)) as response:

headers = {
"X-Dataverse-key": api_token,
}

async with session.delete(urljoin(dataverse_url, url), headers=headers) as response:
response.raise_for_status()


Expand Down Expand Up @@ -563,6 +572,7 @@ async def _multipart_json_data_request(
Returns:
None
"""

with aiohttp.MultipartWriter("form-data") as writer:
json_part = writer.append(json_data)
json_part.set_content_disposition("form-data", name="jsonData")
Expand Down
Loading