From eee088ccde1d657b0a612c45ac7440bbc70db21a Mon Sep 17 00:00:00 2001 From: Vivekgupta008 Date: Sun, 22 Feb 2026 02:24:10 +0530 Subject: [PATCH 1/2] add Parquet dataset upload endpoint --- pyproject.toml | 3 + src/config.py | 4 + src/config.toml | 8 + src/core/parquet.py | 77 +++++++++ src/core/storage.py | 59 +++++++ src/database/datasets.py | 157 ++++++++++++++++++ src/routers/openml/datasets.py | 152 +++++++++++++++++- src/schemas/datasets/upload.py | 29 ++++ tests/core/parquet_test.py | 86 ++++++++++ tests/routers/openml/dataset_upload_test.py | 167 ++++++++++++++++++++ 10 files changed, 740 insertions(+), 2 deletions(-) create mode 100644 src/core/parquet.py create mode 100644 src/core/storage.py create mode 100644 src/schemas/datasets/upload.py create mode 100644 tests/core/parquet_test.py create mode 100644 tests/routers/openml/dataset_upload_test.py diff --git a/pyproject.toml b/pyproject.toml index d3b013c7..09d7e8b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,9 @@ dependencies = [ "mysqlclient", "python_dotenv", "xmltodict", + "pyarrow>=14.0", + "boto3>=1.34", + "python-multipart>=0.0.9", ] [project.optional-dependencies] diff --git a/src/config.py b/src/config.py index 4a234a51..629d204e 100644 --- a/src/config.py +++ b/src/config.py @@ -79,3 +79,7 @@ def load_database_configuration(file: Path = _config_file) -> TomlTable: def load_configuration(file: Path = _config_file) -> TomlTable: return tomllib.loads(file.read_text()) + + +def load_minio_configuration(file: Path = _config_file) -> TomlTable: + return typing.cast("TomlTable", _load_configuration(file).get("minio", {})) diff --git a/src/config.toml b/src/config.toml index 10d75534..d5039e17 100644 --- a/src/config.toml +++ b/src/config.toml @@ -22,3 +22,11 @@ database="openml" [routing] minio_url="http://minio:9000/" server_url="http://php-api:80/" + +[minio] +endpoint_url="http://minio:9000" +bucket="datasets" +# Credentials should be provided via environment variables: +# OPENML_MINIO_ACCESS_KEY and OPENML_MINIO_SECRET_KEY +access_key="minioadmin" +secret_key="minioadmin" diff --git a/src/core/parquet.py b/src/core/parquet.py new file mode 100644 index 00000000..5b5d3815 --- /dev/null +++ b/src/core/parquet.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import hashlib +import io +from dataclasses import dataclass, field + +import pyarrow as pa +import pyarrow.parquet as pq + +from schemas.datasets.openml import FeatureType + + +def map_arrow_type(arrow_type: pa.DataType) -> FeatureType: + """Map a PyArrow DataType to an OpenML FeatureType.""" + if pa.types.is_floating(arrow_type) or pa.types.is_integer(arrow_type) or pa.types.is_decimal( + arrow_type + ): + return FeatureType.NUMERIC + if pa.types.is_boolean(arrow_type) or pa.types.is_dictionary(arrow_type): + return FeatureType.NOMINAL + return FeatureType.STRING + + +@dataclass +class ColumnMeta: + index: int + name: str + data_type: FeatureType + number_of_missing_values: int + + +@dataclass +class ParquetMeta: + num_rows: int + num_columns: int + md5_checksum: str + columns: list[ColumnMeta] = field(default_factory=list) + + +def read_parquet_metadata(file_bytes: bytes) -> ParquetMeta: + """Parse *file_bytes* as Parquet and extract schema / quality metadata. + + Raises ``ValueError`` if the bytes are not a valid Parquet file. + """ + try: + buf = io.BytesIO(file_bytes) + pf = pq.ParquetFile(buf) + except Exception as exc: + msg = "File is not a valid Parquet file." + raise ValueError(msg) from exc + + schema = pf.schema_arrow + num_rows = pf.metadata.num_rows + md5 = hashlib.md5(file_bytes, usedforsecurity=False).hexdigest() # noqa: S324 + + # Read full table once to count per-column nulls + table = pf.read() + + columns: list[ColumnMeta] = [] + for idx, col_name in enumerate(schema.names): + col = table.column(col_name) + null_count = col.null_count + columns.append( + ColumnMeta( + index=idx, + name=col_name, + data_type=map_arrow_type(schema.field(col_name).type), + number_of_missing_values=null_count, + ) + ) + + return ParquetMeta( + num_rows=num_rows, + num_columns=len(columns), + md5_checksum=md5, + columns=columns, + ) diff --git a/src/core/storage.py b/src/core/storage.py new file mode 100644 index 00000000..088b1ea2 --- /dev/null +++ b/src/core/storage.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import io +import logging +import os +from typing import TYPE_CHECKING + +import boto3 +from botocore.exceptions import BotoCoreError, ClientError + +from config import _load_configuration, _config_file + +if TYPE_CHECKING: + from pathlib import Path + +logger = logging.getLogger(__name__) + +MINIO_ACCESS_KEY_ENV = "OPENML_MINIO_ACCESS_KEY" +MINIO_SECRET_KEY_ENV = "OPENML_MINIO_SECRET_KEY" # noqa: S105 + + +def _minio_config(file: Path = _config_file) -> dict[str, str]: + cfg = _load_configuration(file).get("minio", {}) + return { + "endpoint_url": cfg.get("endpoint_url", "http://minio:9000"), + "bucket": cfg.get("bucket", "datasets"), + "access_key": os.environ.get(MINIO_ACCESS_KEY_ENV, cfg.get("access_key", "minioadmin")), + "secret_key": os.environ.get(MINIO_SECRET_KEY_ENV, cfg.get("secret_key", "minioadmin")), + } + + +def _object_key(dataset_id: int) -> str: + """Return the MinIO object key for a dataset, matching the existing URL pattern.""" + ten_thousands_prefix = f"{dataset_id // 10_000:04d}" + padded_id = f"{dataset_id:04d}" + return f"datasets/{ten_thousands_prefix}/{padded_id}/dataset_{dataset_id}.pq" + + +def upload_to_minio(file_bytes: bytes, dataset_id: int) -> str: + """Upload *file_bytes* to MinIO and return the object key. + + Raises ``RuntimeError`` on upload failure so callers can convert to HTTP 500. + """ + cfg = _minio_config() + key = _object_key(dataset_id) + try: + client = boto3.client( + "s3", + endpoint_url=cfg["endpoint_url"], + aws_access_key_id=cfg["access_key"], + aws_secret_access_key=cfg["secret_key"], + ) + client.upload_fileobj(io.BytesIO(file_bytes), cfg["bucket"], key) + logger.info("Uploaded dataset %d to MinIO at key '%s'", dataset_id, key) + except (BotoCoreError, ClientError) as exc: + msg = f"Failed to upload dataset {dataset_id} to MinIO: {exc}" + logger.exception(msg) + raise RuntimeError(msg) from exc + return key diff --git a/src/database/datasets.py b/src/database/datasets.py index f011a651..1e8addaf 100644 --- a/src/database/datasets.py +++ b/src/database/datasets.py @@ -178,3 +178,160 @@ def remove_deactivated_status(dataset_id: int, connection: Connection) -> None: ), parameters={"data": dataset_id}, ) + + +def insert_file( + *, + file_name: str, + reference: str, + md5_hash: str, + connection: Connection, +) -> int: + """Insert a row into the `file` table and return the new file id.""" + connection.execute( + text( + """ + INSERT INTO file(`name`, `reference`, `md5_hash`) + VALUES (:name, :reference, :md5_hash) + """, + ), + parameters={"name": file_name, "reference": reference, "md5_hash": md5_hash}, + ) + result = connection.execute(text("SELECT LAST_INSERT_ID()")) + (file_id,) = result.one() + return int(file_id) + + +def insert_dataset( # noqa: PLR0913 + *, + name: str, + description: str, + format_: str, + file_id: int, + uploader: int, + visibility: str, + licence: str, + language: str, + default_target_attribute: str, + original_data_url: str, + paper_url: str, + collection_date: str, + citation: str, + md5_checksum: str, + connection: Connection, +) -> int: + """Insert a row into the `dataset` table and return the new dataset id.""" + connection.execute( + text( + """ + INSERT INTO dataset( + `name`, `description`, `format`, `file_id`, `uploader`, + `visibility`, `licence`, `language`, + `default_target_attribute`, `original_data_url`, `paper_url`, + `collection_date`, `citation`, `md5_checksum`, + `version`, `upload_date` + ) + VALUES ( + :name, :description, :format, :file_id, :uploader, + :visibility, :licence, :language, + :default_target_attribute, :original_data_url, :paper_url, + :collection_date, :citation, :md5_checksum, + 1, NOW() + ) + """, + ), + parameters={ + "name": name, + "description": description, + "format": format_, + "file_id": file_id, + "uploader": uploader, + "visibility": visibility, + "licence": licence, + "language": language, + "default_target_attribute": default_target_attribute, + "original_data_url": original_data_url, + "paper_url": paper_url, + "collection_date": collection_date, + "citation": citation, + "md5_checksum": md5_checksum, + }, + ) + result = connection.execute(text("SELECT LAST_INSERT_ID()")) + (dataset_id,) = result.one() + return int(dataset_id) + + +def insert_description( + *, + dataset_id: int, + description: str, + connection: Connection, +) -> None: + """Insert the initial description into the `dataset_description` table.""" + connection.execute( + text( + """ + INSERT INTO dataset_description(`did`, `description`, `version`) + VALUES (:did, :description, 1) + """, + ), + parameters={"did": dataset_id, "description": description}, + ) + + +def insert_features( + *, + dataset_id: int, + features: list[dict[str, object]], + connection: Connection, +) -> None: + """Bulk-insert feature rows into `data_feature`. + + Each dict in *features* must have: index, name, data_type, is_target, + is_row_identifier, is_ignore, number_of_missing_values. + """ + if not features: + return + for feat in features: + connection.execute( + text( + """ + INSERT INTO data_feature( + `did`, `index`, `name`, `data_type`, + `is_target`, `is_row_identifier`, `is_ignore`, + `NumberOfMissingValues` + ) + VALUES ( + :did, :index, :name, :data_type, + :is_target, :is_row_identifier, :is_ignore, + :number_of_missing_values + ) + """, + ), + parameters={"did": dataset_id, **feat}, + ) + + +def insert_qualities( + *, + dataset_id: int, + qualities: list[dict[str, object]], + connection: Connection, +) -> None: + """Insert quality rows into `data_quality`. + + Each dict must have: quality (str), value (float | None). + """ + if not qualities: + return + for q in qualities: + connection.execute( + text( + """ + INSERT INTO data_quality(`data`, `quality`, `value`) + VALUES (:data, :quality, :value) + """, + ), + parameters={"data": dataset_id, **q}, + ) diff --git a/src/routers/openml/datasets.py b/src/routers/openml/datasets.py index dda25117..8468cb6c 100644 --- a/src/routers/openml/datasets.py +++ b/src/routers/openml/datasets.py @@ -1,10 +1,11 @@ +import json import re from datetime import datetime from enum import StrEnum from http import HTTPStatus from typing import Annotated, Any, Literal, NamedTuple -from fastapi import APIRouter, Body, Depends, HTTPException +from fastapi import APIRouter, Body, Depends, Form, HTTPException, UploadFile from sqlalchemy import Connection, text from sqlalchemy.engine import Row @@ -18,14 +19,161 @@ _format_error, _format_parquet_url, ) +from core.parquet import read_parquet_metadata +from core.storage import upload_to_minio from database.users import User, UserGroup from routers.dependencies import Pagination, expdb_connection, fetch_user, userdb_connection from routers.types import CasualString128, IntegerRange, SystemString64, integer_range_regex -from schemas.datasets.openml import DatasetMetadata, DatasetStatus, Feature, FeatureType +from schemas.datasets.openml import ( + DatasetFileFormat, + DatasetMetadata, + DatasetStatus, + Feature, + FeatureType, +) +from schemas.datasets.upload import DatasetUploadMetadata, DatasetUploadResponse router = APIRouter(prefix="/datasets", tags=["datasets"]) +@router.post( + path="/upload", + summary="Upload a Parquet dataset", + status_code=HTTPStatus.CREATED, +) +def upload_dataset( + file: Annotated[UploadFile, ...], + metadata: Annotated[str, Form(description="JSON-encoded DatasetUploadMetadata")], + user: Annotated[User | None, Depends(fetch_user)] = None, + expdb_db: Annotated[Connection, Depends(expdb_connection)] = None, +) -> DatasetUploadResponse: + """Upload a Parquet file as a new OpenML dataset. + + Send as multipart/form-data with: + - **file**: the `.parquet` file + - **metadata**: a JSON string with name, description, visibility, etc. + """ + if user is None: + raise HTTPException( + status_code=HTTPStatus.UNAUTHORIZED, + detail={ + "code": "103", + "message": "Authentication failed. Please provide a valid API key.", + }, + ) + + # --- Validate file type --- + filename = file.filename or "" + if not filename.lower().endswith(".parquet"): + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST, + detail={"code": "110", "message": "Only .parquet files are accepted."}, + ) + + # --- Parse metadata JSON --- + try: + upload_meta = DatasetUploadMetadata(**json.loads(metadata)) + except (ValueError, TypeError) as exc: + raise HTTPException( + status_code=HTTPStatus.UNPROCESSABLE_ENTITY, + detail={"code": "111", "message": f"Invalid metadata JSON: {exc}"}, + ) from exc + + # --- Read & validate Parquet --- + file_bytes = file.file.read() + try: + parquet_meta = read_parquet_metadata(file_bytes) + except ValueError as exc: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST, + detail={"code": "112", "message": str(exc)}, + ) from exc + + # --- DB: insert file record (MinIO reference filled after we know the did) --- + file_id = database.datasets.insert_file( + file_name=filename, + reference="", + md5_hash=parquet_meta.md5_checksum, + connection=expdb_db, + ) + + # --- DB: insert dataset record --- + dataset_id = database.datasets.insert_dataset( + name=upload_meta.name, + description=upload_meta.description, + format_=DatasetFileFormat.PARQUET, + file_id=file_id, + uploader=user.user_id, + visibility=upload_meta.visibility, + licence=upload_meta.licence, + language=upload_meta.language, + default_target_attribute=upload_meta.default_target_attribute, + original_data_url=upload_meta.original_data_url, + paper_url=upload_meta.paper_url, + collection_date=upload_meta.collection_date, + citation=upload_meta.citation, + md5_checksum=parquet_meta.md5_checksum, + connection=expdb_db, + ) + + # --- Upload actual file to MinIO (now we know dataset_id) --- + try: + upload_to_minio(file_bytes, dataset_id) + except RuntimeError as exc: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail={"code": "113", "message": str(exc)}, + ) from exc + + # --- DB: description, features, qualities, status --- + database.datasets.insert_description( + dataset_id=dataset_id, + description=upload_meta.description, + connection=expdb_db, + ) + + features = [ + { + "index": col.index, + "name": col.name, + "data_type": col.data_type, + "is_target": col.name == upload_meta.default_target_attribute, + "is_row_identifier": False, + "is_ignore": False, + "number_of_missing_values": col.number_of_missing_values, + } + for col in parquet_meta.columns + ] + database.datasets.insert_features( + dataset_id=dataset_id, + features=features, + connection=expdb_db, + ) + + qualities = [ + {"quality": "NumberOfInstances", "value": float(parquet_meta.num_rows)}, + {"quality": "NumberOfFeatures", "value": float(parquet_meta.num_columns)}, + { + "quality": "NumberOfMissingValues", + "value": float(sum(c.number_of_missing_values for c in parquet_meta.columns)), + }, + ] + database.datasets.insert_qualities( + dataset_id=dataset_id, + qualities=qualities, + connection=expdb_db, + ) + + database.datasets.update_status( + dataset_id=dataset_id, + status="in_preparation", + user_id=user.user_id, + connection=expdb_db, + ) + + return DatasetUploadResponse(upload_dataset={"id": dataset_id}) + + @router.post( path="/tag", ) diff --git a/src/schemas/datasets/upload.py b/src/schemas/datasets/upload.py new file mode 100644 index 00000000..d5e1dec2 --- /dev/null +++ b/src/schemas/datasets/upload.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from pydantic import BaseModel, Field + +from schemas.datasets.openml import Visibility + + +class DatasetUploadMetadata(BaseModel): + """Metadata provided alongside the uploaded Parquet file.""" + + name: str = Field(description="Human-readable name of the dataset.", min_length=1, max_length=256) + description: str = Field(description="Description of the dataset.", min_length=1) + default_target_attribute: str = Field( + default="", + description="Comma-separated column name(s) to use as the prediction target.", + ) + visibility: Visibility = Field(default=Visibility.PUBLIC, description="Dataset visibility.") + licence: str = Field(default="CC0", description="Dataset licence.") + language: str = Field(default="English", description="Language of the dataset.") + citation: str = Field(default="", description="Citation string for the dataset.") + original_data_url: str = Field(default="", description="URL of the original data source.") + paper_url: str = Field(default="", description="URL of a related paper.") + collection_date: str = Field(default="", description="When the data was collected.") + + +class DatasetUploadResponse(BaseModel): + """Response returned after a successful dataset upload.""" + + upload_dataset: dict[str, int] diff --git a/tests/core/parquet_test.py b/tests/core/parquet_test.py new file mode 100644 index 00000000..a945a8b9 --- /dev/null +++ b/tests/core/parquet_test.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import io + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from core.parquet import FeatureType, ParquetMeta, map_arrow_type, read_parquet_metadata + + +def _make_parquet_bytes(**columns: pa.Array) -> bytes: + """Build an in-memory Parquet file from keyword-arg columns.""" + table = pa.table(columns) + buf = io.BytesIO() + pq.write_table(table, buf) + return buf.getvalue() + + +@pytest.mark.parametrize( + ("arrow_type", "expected"), + [ + (pa.int32(), FeatureType.NUMERIC), + (pa.int64(), FeatureType.NUMERIC), + (pa.float32(), FeatureType.NUMERIC), + (pa.float64(), FeatureType.NUMERIC), + (pa.bool_(), FeatureType.NOMINAL), + (pa.dictionary(pa.int8(), pa.utf8()), FeatureType.NOMINAL), + (pa.string(), FeatureType.STRING), + (pa.utf8(), FeatureType.STRING), + (pa.date32(), FeatureType.STRING), + (pa.timestamp("ms"), FeatureType.STRING), + ], +) +def test_map_arrow_type(arrow_type: pa.DataType, expected: FeatureType) -> None: + assert map_arrow_type(arrow_type) == expected + + +def test_read_parquet_metadata_returns_correct_shape() -> None: + data = _make_parquet_bytes( + a=pa.array([1, 2, 3], type=pa.int32()), + b=pa.array([1.0, 2.0, 3.0], type=pa.float64()), + label=pa.array(["x", "y", "z"], type=pa.string()), + ) + meta: ParquetMeta = read_parquet_metadata(data) + + assert meta.num_rows == 3 + assert meta.num_columns == 3 + assert len(meta.columns) == 3 + assert meta.md5_checksum # non-empty + + +def test_read_parquet_metadata_column_types() -> None: + data = _make_parquet_bytes( + numeric=pa.array([1, 2], type=pa.int64()), + text=pa.array(["a", "b"], type=pa.string()), + ) + meta = read_parquet_metadata(data) + + col_map = {c.name: c.data_type for c in meta.columns} + assert col_map["numeric"] == FeatureType.NUMERIC + assert col_map["text"] == FeatureType.STRING + + +def test_read_parquet_metadata_counts_missing_values() -> None: + data = _make_parquet_bytes( + col=pa.array([1, None, 3, None], type=pa.int32()), + ) + meta = read_parquet_metadata(data) + assert meta.columns[0].number_of_missing_values == 2 + + +def test_read_parquet_metadata_zero_missing_values() -> None: + data = _make_parquet_bytes(col=pa.array([1, 2, 3], type=pa.int32())) + meta = read_parquet_metadata(data) + assert meta.columns[0].number_of_missing_values == 0 + + +def test_read_parquet_metadata_raises_on_invalid_bytes() -> None: + with pytest.raises(ValueError, match="valid Parquet"): + read_parquet_metadata(b"this is not parquet data at all!") + + +def test_read_parquet_metadata_md5_is_deterministic() -> None: + data = _make_parquet_bytes(x=pa.array([1, 2, 3])) + assert read_parquet_metadata(data).md5_checksum == read_parquet_metadata(data).md5_checksum diff --git a/tests/routers/openml/dataset_upload_test.py b/tests/routers/openml/dataset_upload_test.py new file mode 100644 index 00000000..b918d9f9 --- /dev/null +++ b/tests/routers/openml/dataset_upload_test.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import io +import json +from http import HTTPStatus +from unittest.mock import MagicMock, patch + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import Connection + +from database.users import User, UserGroup +from main import create_api +from routers.dependencies import expdb_connection, fetch_user, userdb_connection + +_SOME_USER = User(user_id=2, _database=None, _groups=[UserGroup.READ_WRITE]) + +_METADATA = { + "name": "test-iris", + "description": "A test dataset", + "default_target_attribute": "label", + "visibility": "public", + "licence": "CC0", + "language": "English", + "citation": "", +} + + +def _make_parquet_bytes() -> bytes: + """Build a minimal valid Parquet file for tests.""" + table = pa.table( + { + "sepal_length": pa.array([5.1, 4.9, 4.7], type=pa.float64()), + "sepal_width": pa.array([3.5, 3.0, 3.2], type=pa.float64()), + "label": pa.array(["setosa", "setosa", "virginica"], type=pa.string()), + } + ) + buf = io.BytesIO() + pq.write_table(table, buf) + return buf.getvalue() + + +def _upload( + client: TestClient, + *, + file_bytes: bytes, + filename: str = "iris.parquet", + extra_meta: dict | None = None, +) -> object: + meta = {**_METADATA, **(extra_meta or {})} + files = {"file": (filename, io.BytesIO(file_bytes), "application/octet-stream")} + data = {"metadata": json.dumps(meta)} + return client.post("/datasets/upload", files=files, data=data) + + +@pytest.fixture +def mock_connection() -> MagicMock: + conn = MagicMock(spec=Connection) + conn.execute.return_value = MagicMock(one=MagicMock(return_value=(42,))) + return conn + + +@pytest.fixture +def api_client_authenticated(mock_connection: MagicMock) -> TestClient: + """TestClient with DB connections mocked and user injected (no Docker needed).""" + app = create_api() + app.dependency_overrides[expdb_connection] = lambda: mock_connection + app.dependency_overrides[userdb_connection] = lambda: mock_connection + app.dependency_overrides[fetch_user] = lambda: _SOME_USER + return TestClient(app) + + +@pytest.fixture +def api_client_unauthenticated(mock_connection: MagicMock) -> TestClient: + """TestClient with no authenticated user.""" + app = create_api() + app.dependency_overrides[expdb_connection] = lambda: mock_connection + app.dependency_overrides[userdb_connection] = lambda: mock_connection + app.dependency_overrides[fetch_user] = lambda: None + return TestClient(app) + + +def test_upload_unauthenticated(api_client_unauthenticated: TestClient) -> None: + response = _upload(api_client_unauthenticated, file_bytes=_make_parquet_bytes()) + assert response.status_code == HTTPStatus.UNAUTHORIZED + + +def test_upload_non_parquet_file(api_client_authenticated: TestClient) -> None: + response = _upload(api_client_authenticated, file_bytes=b"col1,col2\n1,2\n", filename="data.csv") + assert response.status_code == HTTPStatus.BAD_REQUEST + assert response.json()["detail"]["code"] == "110" + + +def test_upload_invalid_parquet_bytes(api_client_authenticated: TestClient) -> None: + response = _upload(api_client_authenticated, file_bytes=b"definitely not parquet") + assert response.status_code == HTTPStatus.BAD_REQUEST + assert response.json()["detail"]["code"] == "112" + + +def test_upload_invalid_metadata_json(api_client_authenticated: TestClient) -> None: + files = {"file": ("iris.parquet", io.BytesIO(_make_parquet_bytes()), "application/octet-stream")} + data = {"metadata": "NOT VALID JSON {{{"} + response = api_client_authenticated.post("/datasets/upload", files=files, data=data) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + + +def test_upload_parquet_success(api_client_authenticated: TestClient) -> None: + file_bytes = _make_parquet_bytes() + + with ( + patch("routers.openml.datasets.upload_to_minio", return_value="key"), + patch("database.datasets.insert_file", return_value=99), + patch("database.datasets.insert_dataset", return_value=42), + patch("database.datasets.insert_description"), + patch("database.datasets.insert_features"), + patch("database.datasets.insert_qualities"), + patch("database.datasets.update_status"), + ): + response = _upload(api_client_authenticated, file_bytes=file_bytes) + + assert response.status_code == HTTPStatus.CREATED + body = response.json() + assert body["upload_dataset"]["id"] == 42 + + +def test_upload_minio_failure_returns_500(api_client_authenticated: TestClient) -> None: + file_bytes = _make_parquet_bytes() + + with ( + patch("routers.openml.datasets.upload_to_minio", side_effect=RuntimeError("connection refused")), + patch("database.datasets.insert_file", return_value=99), + patch("database.datasets.insert_dataset", return_value=42), + ): + response = _upload(api_client_authenticated, file_bytes=file_bytes) + + assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR + assert response.json()["detail"]["code"] == "113" + + +def test_upload_features_extracted_correctly(api_client_authenticated: TestClient) -> None: + file_bytes = _make_parquet_bytes() + captured: list = [] + + def capture_features(*, dataset_id: int, features: list, connection: object) -> None: + captured.extend(features) + + with ( + patch("routers.openml.datasets.upload_to_minio", return_value="key"), + patch("database.datasets.insert_file", return_value=99), + patch("database.datasets.insert_dataset", return_value=42), + patch("database.datasets.insert_description"), + patch("database.datasets.insert_features", side_effect=capture_features), + patch("database.datasets.insert_qualities"), + patch("database.datasets.update_status"), + ): + response = _upload(api_client_authenticated, file_bytes=file_bytes) + + assert response.status_code == HTTPStatus.CREATED + names = [f["name"] for f in captured] + assert "sepal_length" in names + assert "label" in names + label_feat = next(f for f in captured if f["name"] == "label") + assert label_feat["is_target"] is True + sepal_feat = next(f for f in captured if f["name"] == "sepal_length") + assert sepal_feat["is_target"] is False From 093717a3a1a9b8a0ca5c470309d0368dfb523263 Mon Sep 17 00:00:00 2001 From: Vivekgupta008 Date: Sun, 22 Feb 2026 11:27:12 +0530 Subject: [PATCH 2/2] Done suggested changes --- src/config.toml | 4 +- src/core/parquet.py | 20 ++- src/core/storage.py | 14 +- src/database/datasets.py | 78 +++++---- src/routers/openml/datasets.py | 35 +++- src/schemas/datasets/upload.py | 6 +- tests/core/__init__.py | 0 tests/core/parquet_test.py | 12 +- tests/routers/openml/dataset_upload_test.py | 176 +++++++++++++++++--- 9 files changed, 264 insertions(+), 81 deletions(-) create mode 100644 tests/core/__init__.py diff --git a/src/config.toml b/src/config.toml index d5039e17..7cd809c2 100644 --- a/src/config.toml +++ b/src/config.toml @@ -26,7 +26,5 @@ server_url="http://php-api:80/" [minio] endpoint_url="http://minio:9000" bucket="datasets" -# Credentials should be provided via environment variables: +# Credentials must be provided via environment variables: # OPENML_MINIO_ACCESS_KEY and OPENML_MINIO_SECRET_KEY -access_key="minioadmin" -secret_key="minioadmin" diff --git a/src/core/parquet.py b/src/core/parquet.py index 5b5d3815..7848c31f 100644 --- a/src/core/parquet.py +++ b/src/core/parquet.py @@ -9,11 +9,23 @@ from schemas.datasets.openml import FeatureType +__all__ = [ + "ColumnMeta", + "FeatureType", + "ParquetMeta", + "map_arrow_type", + "read_parquet_metadata", +] + def map_arrow_type(arrow_type: pa.DataType) -> FeatureType: """Map a PyArrow DataType to an OpenML FeatureType.""" - if pa.types.is_floating(arrow_type) or pa.types.is_integer(arrow_type) or pa.types.is_decimal( - arrow_type + if ( + pa.types.is_floating(arrow_type) + or pa.types.is_integer(arrow_type) + or pa.types.is_decimal( + arrow_type, + ) ): return FeatureType.NUMERIC if pa.types.is_boolean(arrow_type) or pa.types.is_dictionary(arrow_type): @@ -51,7 +63,7 @@ def read_parquet_metadata(file_bytes: bytes) -> ParquetMeta: schema = pf.schema_arrow num_rows = pf.metadata.num_rows - md5 = hashlib.md5(file_bytes, usedforsecurity=False).hexdigest() # noqa: S324 + md5 = hashlib.md5(file_bytes, usedforsecurity=False).hexdigest() # Read full table once to count per-column nulls table = pf.read() @@ -66,7 +78,7 @@ def read_parquet_metadata(file_bytes: bytes) -> ParquetMeta: name=col_name, data_type=map_arrow_type(schema.field(col_name).type), number_of_missing_values=null_count, - ) + ), ) return ParquetMeta( diff --git a/src/core/storage.py b/src/core/storage.py index 088b1ea2..8e6554b0 100644 --- a/src/core/storage.py +++ b/src/core/storage.py @@ -8,7 +8,7 @@ import boto3 from botocore.exceptions import BotoCoreError, ClientError -from config import _load_configuration, _config_file +from config import _config_file, _load_configuration if TYPE_CHECKING: from pathlib import Path @@ -21,11 +21,19 @@ def _minio_config(file: Path = _config_file) -> dict[str, str]: cfg = _load_configuration(file).get("minio", {}) + access_key = os.environ.get(MINIO_ACCESS_KEY_ENV) or cfg.get("access_key", "") + secret_key = os.environ.get(MINIO_SECRET_KEY_ENV) or cfg.get("secret_key", "") + if not access_key or not secret_key: + msg = ( + f"MinIO credentials not found. Set {MINIO_ACCESS_KEY_ENV} and " + f"{MINIO_SECRET_KEY_ENV} environment variables." + ) + raise RuntimeError(msg) return { "endpoint_url": cfg.get("endpoint_url", "http://minio:9000"), "bucket": cfg.get("bucket", "datasets"), - "access_key": os.environ.get(MINIO_ACCESS_KEY_ENV, cfg.get("access_key", "minioadmin")), - "secret_key": os.environ.get(MINIO_SECRET_KEY_ENV, cfg.get("secret_key", "minioadmin")), + "access_key": access_key, + "secret_key": secret_key, } diff --git a/src/database/datasets.py b/src/database/datasets.py index 1e8addaf..90cfd8bf 100644 --- a/src/database/datasets.py +++ b/src/database/datasets.py @@ -202,6 +202,19 @@ def insert_file( return int(file_id) +def update_file_reference( + *, + file_id: int, + reference: str, + connection: Connection, +) -> None: + """Update the MinIO object key on an existing file row.""" + connection.execute( + text("UPDATE file SET `reference` = :reference WHERE `id` = :file_id"), + parameters={"reference": reference, "file_id": file_id}, + ) + + def insert_dataset( # noqa: PLR0913 *, name: str, @@ -286,31 +299,26 @@ def insert_features( features: list[dict[str, object]], connection: Connection, ) -> None: - """Bulk-insert feature rows into `data_feature`. - - Each dict in *features* must have: index, name, data_type, is_target, - is_row_identifier, is_ignore, number_of_missing_values. - """ + """Bulk-insert feature rows into `data_feature` in a single round-trip.""" if not features: return - for feat in features: - connection.execute( - text( - """ - INSERT INTO data_feature( - `did`, `index`, `name`, `data_type`, - `is_target`, `is_row_identifier`, `is_ignore`, - `NumberOfMissingValues` - ) - VALUES ( - :did, :index, :name, :data_type, - :is_target, :is_row_identifier, :is_ignore, - :number_of_missing_values - ) - """, - ), - parameters={"did": dataset_id, **feat}, - ) + connection.execute( + text( + """ + INSERT INTO data_feature( + `did`, `index`, `name`, `data_type`, + `is_target`, `is_row_identifier`, `is_ignore`, + `NumberOfMissingValues` + ) + VALUES ( + :did, :index, :name, :data_type, + :is_target, :is_row_identifier, :is_ignore, + :number_of_missing_values + ) + """, + ), + [{"did": dataset_id, **feat} for feat in features], + ) def insert_qualities( @@ -319,19 +327,15 @@ def insert_qualities( qualities: list[dict[str, object]], connection: Connection, ) -> None: - """Insert quality rows into `data_quality`. - - Each dict must have: quality (str), value (float | None). - """ + """Bulk-insert quality rows into `data_quality` in a single round-trip.""" if not qualities: return - for q in qualities: - connection.execute( - text( - """ - INSERT INTO data_quality(`data`, `quality`, `value`) - VALUES (:data, :quality, :value) - """, - ), - parameters={"data": dataset_id, **q}, - ) + connection.execute( + text( + """ + INSERT INTO data_quality(`data`, `quality`, `value`) + VALUES (:data, :quality, :value) + """, + ), + [{"data": dataset_id, **q} for q in qualities], + ) diff --git a/src/routers/openml/datasets.py b/src/routers/openml/datasets.py index 8468cb6c..3ee08bb1 100644 --- a/src/routers/openml/datasets.py +++ b/src/routers/openml/datasets.py @@ -89,7 +89,23 @@ def upload_dataset( detail={"code": "112", "message": str(exc)}, ) from exc - # --- DB: insert file record (MinIO reference filled after we know the did) --- + # --- Validate target attribute exists in the Parquet schema --- + target_attribute = upload_meta.default_target_attribute + if target_attribute is not None: + parquet_column_names = {col.name for col in parquet_meta.columns} + if target_attribute not in parquet_column_names: + raise HTTPException( + status_code=HTTPStatus.UNPROCESSABLE_ENTITY, + detail={ + "code": "114", + "message": ( + f"Default target attribute '{target_attribute}' " + "does not exist in the uploaded dataset columns." + ), + }, + ) + + # --- DB: insert file and dataset rows (reference updated after upload) --- file_id = database.datasets.insert_file( file_name=filename, reference="", @@ -97,7 +113,6 @@ def upload_dataset( connection=expdb_db, ) - # --- DB: insert dataset record --- dataset_id = database.datasets.insert_dataset( name=upload_meta.name, description=upload_meta.description, @@ -107,7 +122,7 @@ def upload_dataset( visibility=upload_meta.visibility, licence=upload_meta.licence, language=upload_meta.language, - default_target_attribute=upload_meta.default_target_attribute, + default_target_attribute=target_attribute, original_data_url=upload_meta.original_data_url, paper_url=upload_meta.paper_url, collection_date=upload_meta.collection_date, @@ -116,15 +131,23 @@ def upload_dataset( connection=expdb_db, ) - # --- Upload actual file to MinIO (now we know dataset_id) --- + # --- Upload file to MinIO; roll back DB on failure to avoid orphan rows --- try: - upload_to_minio(file_bytes, dataset_id) + minio_key = upload_to_minio(file_bytes, dataset_id) except RuntimeError as exc: + expdb_db.rollback() raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail={"code": "113", "message": str(exc)}, ) from exc + # Persist the real object location now that we have the key + database.datasets.update_file_reference( + file_id=file_id, + reference=minio_key, + connection=expdb_db, + ) + # --- DB: description, features, qualities, status --- database.datasets.insert_description( dataset_id=dataset_id, @@ -137,7 +160,7 @@ def upload_dataset( "index": col.index, "name": col.name, "data_type": col.data_type, - "is_target": col.name == upload_meta.default_target_attribute, + "is_target": col.name == target_attribute, "is_row_identifier": False, "is_ignore": False, "number_of_missing_values": col.number_of_missing_values, diff --git a/src/schemas/datasets/upload.py b/src/schemas/datasets/upload.py index d5e1dec2..1f0230f1 100644 --- a/src/schemas/datasets/upload.py +++ b/src/schemas/datasets/upload.py @@ -8,7 +8,11 @@ class DatasetUploadMetadata(BaseModel): """Metadata provided alongside the uploaded Parquet file.""" - name: str = Field(description="Human-readable name of the dataset.", min_length=1, max_length=256) + name: str = Field( + description="Human-readable name of the dataset.", + min_length=1, + max_length=256, + ) description: str = Field(description="Description of the dataset.", min_length=1) default_target_attribute: str = Field( default="", diff --git a/tests/core/__init__.py b/tests/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/parquet_test.py b/tests/core/parquet_test.py index a945a8b9..6fec24cd 100644 --- a/tests/core/parquet_test.py +++ b/tests/core/parquet_test.py @@ -8,6 +8,10 @@ from core.parquet import FeatureType, ParquetMeta, map_arrow_type, read_parquet_metadata +_NUM_TEST_ROWS = 3 +_NUM_TEST_COLS = 3 +_EXPECTED_MISSING = 2 + def _make_parquet_bytes(**columns: pa.Array) -> bytes: """Build an in-memory Parquet file from keyword-arg columns.""" @@ -44,9 +48,9 @@ def test_read_parquet_metadata_returns_correct_shape() -> None: ) meta: ParquetMeta = read_parquet_metadata(data) - assert meta.num_rows == 3 - assert meta.num_columns == 3 - assert len(meta.columns) == 3 + assert meta.num_rows == _NUM_TEST_ROWS + assert meta.num_columns == _NUM_TEST_COLS + assert len(meta.columns) == _NUM_TEST_COLS assert meta.md5_checksum # non-empty @@ -67,7 +71,7 @@ def test_read_parquet_metadata_counts_missing_values() -> None: col=pa.array([1, None, 3, None], type=pa.int32()), ) meta = read_parquet_metadata(data) - assert meta.columns[0].number_of_missing_values == 2 + assert meta.columns[0].number_of_missing_values == _EXPECTED_MISSING def test_read_parquet_metadata_zero_missing_values() -> None: diff --git a/tests/routers/openml/dataset_upload_test.py b/tests/routers/openml/dataset_upload_test.py index b918d9f9..e406d942 100644 --- a/tests/routers/openml/dataset_upload_test.py +++ b/tests/routers/openml/dataset_upload_test.py @@ -1,8 +1,11 @@ +"""Integration tests for POST /datasets/upload.""" + from __future__ import annotations import io import json from http import HTTPStatus +from typing import Any from unittest.mock import MagicMock, patch import pyarrow as pa @@ -15,9 +18,15 @@ from main import create_api from routers.dependencies import expdb_connection, fetch_user, userdb_connection +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + _SOME_USER = User(user_id=2, _database=None, _groups=[UserGroup.READ_WRITE]) +_EXPECTED_DATASET_ID = 42 +_EXPECTED_FILE_ID = 99 -_METADATA = { +_METADATA: dict[str, str] = { "name": "test-iris", "description": "A test dataset", "default_target_attribute": "label", @@ -27,15 +36,29 @@ "citation": "", } +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + -def _make_parquet_bytes() -> bytes: - """Build a minimal valid Parquet file for tests.""" +def _make_parquet_bytes( + *, + missing_in_col: int = 0, +) -> bytes: + """Build a minimal valid Parquet file. + + Args: + missing_in_col: number of nulls to inject into the first numeric column. + """ + col_data: list[float | None] = [5.1, 4.9, 4.7] + if missing_in_col: + col_data = col_data[: len(col_data) - missing_in_col] + [None] * missing_in_col table = pa.table( { - "sepal_length": pa.array([5.1, 4.9, 4.7], type=pa.float64()), + "sepal_length": pa.array(col_data, type=pa.float64()), "sepal_width": pa.array([3.5, 3.0, 3.2], type=pa.float64()), "label": pa.array(["setosa", "setosa", "virginica"], type=pa.string()), - } + }, ) buf = io.BytesIO() pq.write_table(table, buf) @@ -47,8 +70,9 @@ def _upload( *, file_bytes: bytes, filename: str = "iris.parquet", - extra_meta: dict | None = None, -) -> object: + extra_meta: dict[str, str] | None = None, +) -> Any: # noqa: ANN401 + """Post a dataset upload request; returns the httpx Response.""" meta = {**_METADATA, **(extra_meta or {})} files = {"file": (filename, io.BytesIO(file_bytes), "application/octet-stream")} data = {"metadata": json.dumps(meta)} @@ -58,7 +82,9 @@ def _upload( @pytest.fixture def mock_connection() -> MagicMock: conn = MagicMock(spec=Connection) - conn.execute.return_value = MagicMock(one=MagicMock(return_value=(42,))) + conn.execute.return_value = MagicMock( + one=MagicMock(return_value=(_EXPECTED_DATASET_ID,)), + ) return conn @@ -88,7 +114,11 @@ def test_upload_unauthenticated(api_client_unauthenticated: TestClient) -> None: def test_upload_non_parquet_file(api_client_authenticated: TestClient) -> None: - response = _upload(api_client_authenticated, file_bytes=b"col1,col2\n1,2\n", filename="data.csv") + response = _upload( + api_client_authenticated, + file_bytes=b"col1,col2\n1,2\n", + filename="data.csv", + ) assert response.status_code == HTTPStatus.BAD_REQUEST assert response.json()["detail"]["code"] == "110" @@ -100,19 +130,36 @@ def test_upload_invalid_parquet_bytes(api_client_authenticated: TestClient) -> N def test_upload_invalid_metadata_json(api_client_authenticated: TestClient) -> None: - files = {"file": ("iris.parquet", io.BytesIO(_make_parquet_bytes()), "application/octet-stream")} + files = { + "file": ("iris.parquet", io.BytesIO(_make_parquet_bytes()), "application/octet-stream"), + } data = {"metadata": "NOT VALID JSON {{{"} response = api_client_authenticated.post("/datasets/upload", files=files, data=data) assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY +def test_upload_invalid_target_attribute(api_client_authenticated: TestClient) -> None: + """Target attribute not present in the Parquet schema → 422 before any DB writes.""" + response = _upload( + api_client_authenticated, + file_bytes=_make_parquet_bytes(), + extra_meta={"default_target_attribute": "nonexistent_column"}, + ) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + assert response.json()["detail"]["code"] == "114" + + def test_upload_parquet_success(api_client_authenticated: TestClient) -> None: file_bytes = _make_parquet_bytes() with ( - patch("routers.openml.datasets.upload_to_minio", return_value="key"), - patch("database.datasets.insert_file", return_value=99), - patch("database.datasets.insert_dataset", return_value=42), + patch( + "routers.openml.datasets.upload_to_minio", + return_value="datasets/0000/0042/dataset_42.pq", + ), + patch("database.datasets.insert_file", return_value=_EXPECTED_FILE_ID), + patch("database.datasets.insert_dataset", return_value=_EXPECTED_DATASET_ID), + patch("database.datasets.update_file_reference"), patch("database.datasets.insert_description"), patch("database.datasets.insert_features"), patch("database.datasets.insert_qualities"), @@ -122,34 +169,81 @@ def test_upload_parquet_success(api_client_authenticated: TestClient) -> None: assert response.status_code == HTTPStatus.CREATED body = response.json() - assert body["upload_dataset"]["id"] == 42 + assert body["upload_dataset"]["id"] == _EXPECTED_DATASET_ID -def test_upload_minio_failure_returns_500(api_client_authenticated: TestClient) -> None: +def test_upload_minio_key_is_persisted(api_client_authenticated: TestClient) -> None: + """update_file_reference must be called with the key returned by upload_to_minio.""" file_bytes = _make_parquet_bytes() + expected_key = "datasets/0000/0042/dataset_42.pq" + persisted: list[tuple[int, str]] = [] + + def capture_ref( + *, + file_id: int, + reference: str, + connection: object, # noqa: ARG001 + ) -> None: + persisted.append((file_id, reference)) with ( - patch("routers.openml.datasets.upload_to_minio", side_effect=RuntimeError("connection refused")), - patch("database.datasets.insert_file", return_value=99), - patch("database.datasets.insert_dataset", return_value=42), + patch("routers.openml.datasets.upload_to_minio", return_value=expected_key), + patch("database.datasets.insert_file", return_value=_EXPECTED_FILE_ID), + patch("database.datasets.insert_dataset", return_value=_EXPECTED_DATASET_ID), + patch("database.datasets.update_file_reference", side_effect=capture_ref), + patch("database.datasets.insert_description"), + patch("database.datasets.insert_features"), + patch("database.datasets.insert_qualities"), + patch("database.datasets.update_status"), + ): + response = _upload(api_client_authenticated, file_bytes=file_bytes) + + assert response.status_code == HTTPStatus.CREATED + assert len(persisted) == 1 + file_id, key = persisted[0] + assert file_id == _EXPECTED_FILE_ID + assert key == expected_key + + +def test_upload_minio_failure_rolls_back( + api_client_authenticated: TestClient, + mock_connection: MagicMock, +) -> None: + """On MinIO failure the endpoint must roll back the DB connection.""" + file_bytes = _make_parquet_bytes() + + with ( + patch( + "routers.openml.datasets.upload_to_minio", + side_effect=RuntimeError("connection refused"), + ), + patch("database.datasets.insert_file", return_value=_EXPECTED_FILE_ID), + patch("database.datasets.insert_dataset", return_value=_EXPECTED_DATASET_ID), ): response = _upload(api_client_authenticated, file_bytes=file_bytes) assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR assert response.json()["detail"]["code"] == "113" + mock_connection.rollback.assert_called_once() def test_upload_features_extracted_correctly(api_client_authenticated: TestClient) -> None: file_bytes = _make_parquet_bytes() - captured: list = [] - - def capture_features(*, dataset_id: int, features: list, connection: object) -> None: + captured: list[dict[str, object]] = [] + + def capture_features( + *, + dataset_id: int, # noqa: ARG001 + features: list[dict[str, object]], + connection: object, # noqa: ARG001 + ) -> None: captured.extend(features) with ( patch("routers.openml.datasets.upload_to_minio", return_value="key"), - patch("database.datasets.insert_file", return_value=99), - patch("database.datasets.insert_dataset", return_value=42), + patch("database.datasets.insert_file", return_value=_EXPECTED_FILE_ID), + patch("database.datasets.insert_dataset", return_value=_EXPECTED_DATASET_ID), + patch("database.datasets.update_file_reference"), patch("database.datasets.insert_description"), patch("database.datasets.insert_features", side_effect=capture_features), patch("database.datasets.insert_qualities"), @@ -165,3 +259,39 @@ def capture_features(*, dataset_id: int, features: list, connection: object) -> assert label_feat["is_target"] is True sepal_feat = next(f for f in captured if f["name"] == "sepal_length") assert sepal_feat["is_target"] is False + + +def test_upload_qualities_extracted_correctly(api_client_authenticated: TestClient) -> None: + """Qualities sent to insert_qualities must reflect the actual Parquet file stats.""" + file_bytes = _make_parquet_bytes(missing_in_col=1) + captured: list[dict[str, object]] = [] + + def capture_qualities( + *, + dataset_id: int, # noqa: ARG001 + qualities: list[dict[str, object]], + connection: object, # noqa: ARG001 + ) -> None: + captured.extend(qualities) + + with ( + patch("routers.openml.datasets.upload_to_minio", return_value="key"), + patch("database.datasets.insert_file", return_value=_EXPECTED_FILE_ID), + patch("database.datasets.insert_dataset", return_value=_EXPECTED_DATASET_ID), + patch("database.datasets.update_file_reference"), + patch("database.datasets.insert_description"), + patch("database.datasets.insert_features"), + patch("database.datasets.insert_qualities", side_effect=capture_qualities), + patch("database.datasets.update_status"), + ): + response = _upload(api_client_authenticated, file_bytes=file_bytes) + + assert response.status_code == HTTPStatus.CREATED + quality_map = {q["quality"]: q["value"] for q in captured} + + expected_rows = 3.0 + expected_cols = 3.0 + expected_missing = 1.0 + assert quality_map["NumberOfInstances"] == expected_rows + assert quality_map["NumberOfFeatures"] == expected_cols + assert quality_map["NumberOfMissingValues"] == expected_missing