Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ dependencies = [
"mysqlclient",
"python_dotenv",
"xmltodict",
"pyarrow>=14.0",
"boto3>=1.34",
"python-multipart>=0.0.9",
]

[project.optional-dependencies]
Expand Down
4 changes: 4 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ def load_database_configuration(file: Path = _config_file) -> TomlTable:

def load_configuration(file: Path = _config_file) -> TomlTable:
return tomllib.loads(file.read_text())


def load_minio_configuration(file: Path = _config_file) -> TomlTable:
return typing.cast("TomlTable", _load_configuration(file).get("minio", {}))
6 changes: 6 additions & 0 deletions src/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ database="openml"
[routing]
minio_url="http://minio:9000/"
server_url="http://php-api:80/"

[minio]
endpoint_url="http://minio:9000"
bucket="datasets"
# Credentials must be provided via environment variables:
# OPENML_MINIO_ACCESS_KEY and OPENML_MINIO_SECRET_KEY
89 changes: 89 additions & 0 deletions src/core/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations

import hashlib
import io
from dataclasses import dataclass, field

import pyarrow as pa
import pyarrow.parquet as pq

from schemas.datasets.openml import FeatureType

__all__ = [
"ColumnMeta",
"FeatureType",
"ParquetMeta",
"map_arrow_type",
"read_parquet_metadata",
]


def map_arrow_type(arrow_type: pa.DataType) -> FeatureType:
"""Map a PyArrow DataType to an OpenML FeatureType."""
if (
pa.types.is_floating(arrow_type)
or pa.types.is_integer(arrow_type)
or pa.types.is_decimal(
arrow_type,
)
):
return FeatureType.NUMERIC
if pa.types.is_boolean(arrow_type) or pa.types.is_dictionary(arrow_type):
return FeatureType.NOMINAL
return FeatureType.STRING


@dataclass
class ColumnMeta:
index: int
name: str
data_type: FeatureType
number_of_missing_values: int


@dataclass
class ParquetMeta:
num_rows: int
num_columns: int
md5_checksum: str
columns: list[ColumnMeta] = field(default_factory=list)


def read_parquet_metadata(file_bytes: bytes) -> ParquetMeta:
"""Parse *file_bytes* as Parquet and extract schema / quality metadata.

Raises ``ValueError`` if the bytes are not a valid Parquet file.
"""
try:
buf = io.BytesIO(file_bytes)
pf = pq.ParquetFile(buf)
except Exception as exc:
msg = "File is not a valid Parquet file."
raise ValueError(msg) from exc

schema = pf.schema_arrow
num_rows = pf.metadata.num_rows
md5 = hashlib.md5(file_bytes, usedforsecurity=False).hexdigest()

# Read full table once to count per-column nulls
table = pf.read()

columns: list[ColumnMeta] = []
for idx, col_name in enumerate(schema.names):
col = table.column(col_name)
null_count = col.null_count
columns.append(
ColumnMeta(
index=idx,
name=col_name,
data_type=map_arrow_type(schema.field(col_name).type),
number_of_missing_values=null_count,
),
)

return ParquetMeta(
num_rows=num_rows,
num_columns=len(columns),
md5_checksum=md5,
columns=columns,
)
67 changes: 67 additions & 0 deletions src/core/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

import io
import logging
import os
from typing import TYPE_CHECKING

import boto3
from botocore.exceptions import BotoCoreError, ClientError

from config import _config_file, _load_configuration

if TYPE_CHECKING:
from pathlib import Path

logger = logging.getLogger(__name__)

MINIO_ACCESS_KEY_ENV = "OPENML_MINIO_ACCESS_KEY"
MINIO_SECRET_KEY_ENV = "OPENML_MINIO_SECRET_KEY" # noqa: S105


def _minio_config(file: Path = _config_file) -> dict[str, str]:
cfg = _load_configuration(file).get("minio", {})
access_key = os.environ.get(MINIO_ACCESS_KEY_ENV) or cfg.get("access_key", "")
secret_key = os.environ.get(MINIO_SECRET_KEY_ENV) or cfg.get("secret_key", "")
if not access_key or not secret_key:
msg = (
f"MinIO credentials not found. Set {MINIO_ACCESS_KEY_ENV} and "
f"{MINIO_SECRET_KEY_ENV} environment variables."
)
raise RuntimeError(msg)
return {
"endpoint_url": cfg.get("endpoint_url", "http://minio:9000"),
"bucket": cfg.get("bucket", "datasets"),
"access_key": access_key,
"secret_key": secret_key,
}


def _object_key(dataset_id: int) -> str:
"""Return the MinIO object key for a dataset, matching the existing URL pattern."""
ten_thousands_prefix = f"{dataset_id // 10_000:04d}"
padded_id = f"{dataset_id:04d}"
return f"datasets/{ten_thousands_prefix}/{padded_id}/dataset_{dataset_id}.pq"


def upload_to_minio(file_bytes: bytes, dataset_id: int) -> str:
"""Upload *file_bytes* to MinIO and return the object key.

Raises ``RuntimeError`` on upload failure so callers can convert to HTTP 500.
"""
cfg = _minio_config()
key = _object_key(dataset_id)
try:
client = boto3.client(
"s3",
endpoint_url=cfg["endpoint_url"],
aws_access_key_id=cfg["access_key"],
aws_secret_access_key=cfg["secret_key"],
)
client.upload_fileobj(io.BytesIO(file_bytes), cfg["bucket"], key)
logger.info("Uploaded dataset %d to MinIO at key '%s'", dataset_id, key)
except (BotoCoreError, ClientError) as exc:
msg = f"Failed to upload dataset {dataset_id} to MinIO: {exc}"
logger.exception(msg)
raise RuntimeError(msg) from exc
return key
161 changes: 161 additions & 0 deletions src/database/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,164 @@ def remove_deactivated_status(dataset_id: int, connection: Connection) -> None:
),
parameters={"data": dataset_id},
)


def insert_file(
*,
file_name: str,
reference: str,
md5_hash: str,
connection: Connection,
) -> int:
"""Insert a row into the `file` table and return the new file id."""
connection.execute(
text(
"""
INSERT INTO file(`name`, `reference`, `md5_hash`)
VALUES (:name, :reference, :md5_hash)
""",
),
parameters={"name": file_name, "reference": reference, "md5_hash": md5_hash},
)
result = connection.execute(text("SELECT LAST_INSERT_ID()"))
(file_id,) = result.one()
return int(file_id)


def update_file_reference(
*,
file_id: int,
reference: str,
connection: Connection,
) -> None:
"""Update the MinIO object key on an existing file row."""
connection.execute(
text("UPDATE file SET `reference` = :reference WHERE `id` = :file_id"),
parameters={"reference": reference, "file_id": file_id},
)


def insert_dataset( # noqa: PLR0913
*,
name: str,
description: str,
format_: str,
file_id: int,
uploader: int,
visibility: str,
licence: str,
language: str,
default_target_attribute: str,
original_data_url: str,
paper_url: str,
collection_date: str,
citation: str,
md5_checksum: str,
connection: Connection,
) -> int:
"""Insert a row into the `dataset` table and return the new dataset id."""
connection.execute(
text(
"""
INSERT INTO dataset(
`name`, `description`, `format`, `file_id`, `uploader`,
`visibility`, `licence`, `language`,
`default_target_attribute`, `original_data_url`, `paper_url`,
`collection_date`, `citation`, `md5_checksum`,
`version`, `upload_date`
)
VALUES (
:name, :description, :format, :file_id, :uploader,
:visibility, :licence, :language,
:default_target_attribute, :original_data_url, :paper_url,
:collection_date, :citation, :md5_checksum,
1, NOW()
)
""",
),
parameters={
"name": name,
"description": description,
"format": format_,
"file_id": file_id,
"uploader": uploader,
"visibility": visibility,
"licence": licence,
"language": language,
"default_target_attribute": default_target_attribute,
"original_data_url": original_data_url,
"paper_url": paper_url,
"collection_date": collection_date,
"citation": citation,
"md5_checksum": md5_checksum,
},
)
result = connection.execute(text("SELECT LAST_INSERT_ID()"))
(dataset_id,) = result.one()
return int(dataset_id)


def insert_description(
*,
dataset_id: int,
description: str,
connection: Connection,
) -> None:
"""Insert the initial description into the `dataset_description` table."""
connection.execute(
text(
"""
INSERT INTO dataset_description(`did`, `description`, `version`)
VALUES (:did, :description, 1)
""",
),
parameters={"did": dataset_id, "description": description},
)


def insert_features(
*,
dataset_id: int,
features: list[dict[str, object]],
connection: Connection,
) -> None:
"""Bulk-insert feature rows into `data_feature` in a single round-trip."""
if not features:
return
connection.execute(
text(
"""
INSERT INTO data_feature(
`did`, `index`, `name`, `data_type`,
`is_target`, `is_row_identifier`, `is_ignore`,
`NumberOfMissingValues`
)
VALUES (
:did, :index, :name, :data_type,
:is_target, :is_row_identifier, :is_ignore,
:number_of_missing_values
)
""",
),
[{"did": dataset_id, **feat} for feat in features],
)


def insert_qualities(
*,
dataset_id: int,
qualities: list[dict[str, object]],
connection: Connection,
) -> None:
"""Bulk-insert quality rows into `data_quality` in a single round-trip."""
if not qualities:
return
connection.execute(
text(
"""
INSERT INTO data_quality(`data`, `quality`, `value`)
VALUES (:data, :quality, :value)
""",
),
[{"data": dataset_id, **q} for q in qualities],
)
Loading