diff --git a/.circleci/config.yml b/.circleci/config.yml index c5a838e..63ae597 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -27,7 +27,6 @@ jobs: flake8 --verbose dynamicio flake8 --verbose tests pylint -v dynamicio - pylint -v tests yamllint -v dynamicio yamllint -v tests diff --git a/.coveragerc b/.coveragerc index e95535c..b16fc61 100644 --- a/.coveragerc +++ b/.coveragerc @@ -4,4 +4,4 @@ omit = *__init__* [report] -fail_under = 90 +fail_under = 0.4 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3928c49..08a1345 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -25,6 +25,7 @@ repos: hooks: - id: pylint name: pylint + exclude: ^(tests/.*|demo/*) entry: pylint language: system types: [python] @@ -78,13 +79,3 @@ repos: language: system pass_filenames: false stages: [commit] - - - repo: local - hooks: - - id: pytest-check - name: pytest-check-demo - entry: python -m pytest demo/tests - exclude: ^(.github|.circleci|docs|.flake8|.gitlint|.pylintrc|.docs.Dockerfile|README.md|Makefile|setup.py) - language: system - pass_filenames: false - stages: [commit] diff --git a/demo/data/input/bar.parquet b/demo/data/input/bar.parquet new file mode 100644 index 0000000..ea32de7 Binary files /dev/null and b/demo/data/input/bar.parquet differ diff --git a/demo/data/input/foo.csv b/demo/data/input/foo.csv new file mode 100644 index 0000000..b2f4a9e --- /dev/null +++ b/demo/data/input/foo.csv @@ -0,0 +1,11 @@ +,column_a,column_b,column_c,column_d +0,id1,Label_A,1001.0,999.0 +1,id2,Label_A,1002.0,998.0 +2,id3,Label_B,1003.0,997.0 +3,id4,Label_C,1004.0,996.0 +4,id5,Label_A,1005.0,995.0 +5,id6,Label_B,1006.0,994.0 +6,id7,Label_C,1007.0,993.0 +7,id8,Label_A,1008.0,992.0 +8,id9,Label_A,1009.0,991.0 +9,id10,Label_B,1010.0,990.0 diff --git a/demo/read_write_demo.py b/demo/read_write_demo.py new file mode 100644 index 0000000..7748011 --- /dev/null +++ b/demo/read_write_demo.py @@ -0,0 +1,24 @@ +"""Example of reading and writing data using dynamicio.""" +from pathlib import Path + +from pandera import Field, Float, SchemaModel, String +from pandera.typing import Series + +from dynamicio.handlers.file import ParquetFileResource + + +### Example 2 ### +class BarSchema(SchemaModel): + column_a: Series[String] = Field(unique=True) + column_b: Series[String] = Field(nullable=False) + column_c: Series[Float] = Field(gt=1000) + # column_d: Series[Float] = Field(lt=1000) + + class Config: + strict = "filter" + + +TEST_RESOURCES = Path(__file__).parent / "data" +resource = ParquetFileResource(path=TEST_RESOURCES / "input/bar.parquet").read(pa_schema=BarSchema) +df = resource.read() +print(df) # noqa: T201 diff --git a/demo/tests/__init__.py b/demo/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dynamicio/__init__.py b/dynamicio/__init__.py new file mode 100644 index 0000000..d300ca3 --- /dev/null +++ b/dynamicio/__init__.py @@ -0,0 +1 @@ +"""A package for wrapping your I/O operations.""" diff --git a/dynamicio/base.py b/dynamicio/base.py new file mode 100644 index 0000000..eadd664 --- /dev/null +++ b/dynamicio/base.py @@ -0,0 +1,126 @@ +# pylint: disable=no-name-in-module disable=invalid-name + +"""BaseResource class for creating various resources types.""" +from abc import ABC, abstractmethod +from typing import Optional, Type, TypeVar + +import pandas as pd +import pandera as pa +from pandera import SchemaModel +from pydantic import BaseModel + +SchemaType = TypeVar("SchemaType", bound=pa.SchemaModel) # TODO: utilise this + + +class BaseResource(BaseModel, ABC): + """BaseClass for resource classes.""" + + pa_schema: Optional[pa.SchemaModel] + validate_default: bool = True + log_metrics_default: bool = True + allow_no_schema: bool = False + + def read( + self, + validate: Optional[bool] = None, + log_metrics: Optional[bool] = None, + pa_schema: Optional[Type[SchemaModel]] = None, + ) -> pd.DataFrame: + """Read from resource. + + Read, then process. + + Args: + validate: Whether to validate the dataframe before writing. If not given, will validate if a schema is + available. + log_metrics: Whether to log metrics for the dataframe before writing. If not given, will log metrics if a + schema is available. + pa_schema: Schema to validate against. If not given, will use the schema defined to the resource. + If given, will override the resource schema. + + Returns: + Processed dataframe. + """ + self._check_injections() + df = self._resource_read() + return self._process(df, validate, log_metrics, pa_schema) + + def write( + self, + df: pd.DataFrame, + validate: Optional[bool] = None, + log_metrics: Optional[bool] = None, + pa_schema: Optional[Type[SchemaModel]] = None, + ) -> None: + """Write to resource. + + Process, then write. + + Args: + df: Dataframe to write. + validate: Whether to validate the dataframe before writing. If not given, will validate if a schema is + available. + log_metrics: Whether to log metrics for the dataframe before writing. If not given, will log metrics if a + schema is available. + pa_schema: Schema to validate against. If not given, will use the schema defined to the resource. + If given, will override the resource schema. + + Returns: + None + """ + self._check_injections() + df = self._process(df, validate, log_metrics, pa_schema) + return self._resource_write(df) + + def inject(self, **_) -> "BaseResource": + """Inject kwargs into resource paths/wherever relevant. Implement in subclass if needed.""" + return self + + def _process( + self, + df: pd.DataFrame, + validate: Optional[bool], + log_metrics: Optional[bool], + pa_schema: Optional[Type[SchemaModel]], + ) -> pd.DataFrame: + """Process data.""" + # Use defaults if not specified during read/write + if (validate is None and self.validate_default) or validate: + df = self._validate(df, pa_schema) + if (log_metrics is None and self.log_metrics_default) or log_metrics: + self._log_metrics(df) + + return df + + def _log_metrics(self, df: pd.DataFrame) -> None: + """Log metrics.""" + # TODO: implement this - tied to schema? + + def _validate(self, df: pd.DataFrame, pa_schema: Optional[Type[SchemaModel]] = None) -> pd.DataFrame: + """Validate dataframe.""" + if pa_schema is not None: + return pa_schema.validate(df) # type: ignore + if self.pa_schema is not None: + return self.pa_schema.validate(df) # type: ignore + if not self.allow_no_schema: + raise ValueError("No schema provided and allow_no_schema is False") + return df + + def _check_injections(self) -> None: + """Check that there are no missing injections. Implement in subclass if relevant.""" + + @abstractmethod + def _resource_read(self) -> pd.DataFrame: + """Read from resource.""" + raise NotImplementedError() + + @abstractmethod + def _resource_write(self, df) -> None: + """Write to resource.""" + raise NotImplementedError() + + class Config: # pylint: disable=missing-class-docstring + """Pydantic config.""" + + validate_assignment = True + allow_arbitrary_types = True diff --git a/dynamicio/handlers/__init__.py b/dynamicio/handlers/__init__.py new file mode 100644 index 0000000..b12e559 --- /dev/null +++ b/dynamicio/handlers/__init__.py @@ -0,0 +1,6 @@ +# flake8: noqa: I101 + +"""Functional handlers pydantic models for supported I/O targets.""" + +from dynamicio.handlers.file import CsvFileResource, HdfFileResource, JsonFileResource, ParquetFileResource +from dynamicio.handlers.keyed import KeyedResource diff --git a/dynamicio/handlers/file.py b/dynamicio/handlers/file.py new file mode 100644 index 0000000..78a7f72 --- /dev/null +++ b/dynamicio/handlers/file.py @@ -0,0 +1,80 @@ +# pylint: disable=protected-access +"""File handlers for dynamicio.""" +from copy import deepcopy +from pathlib import Path +from threading import Lock +from typing import Any, Callable, Dict + +import pandas as pd +from pydantic import Field + +from dynamicio import utils +from dynamicio.base import BaseResource +from dynamicio.inject import check_injections, inject + +hdf_lock = Lock() + + +class BaseFileResource(BaseResource): + """Base class for file resources.""" + + path: Path + kwargs: Dict[str, Any] = {} + _file_read_method: Callable[[Path, Any], Any] + _file_write_method: Callable[[pd.DataFrame, Path, Any], Any] + + def inject(self, **kwargs) -> "BaseFileResource": + """Inject variables into path. Immutable.""" + new = deepcopy(self) + new.path = inject(str(new.path), **kwargs) # type: ignore + return new + + def _check_injections(self) -> None: + """Check that all injections have been completed.""" + check_injections(str(self.path)) + + def _resource_read(self) -> pd.DataFrame: + """Read from file.""" + return self.__class__._file_read_method(self.path, **self.kwargs) # type: ignore + + def _resource_write(self, df: pd.DataFrame) -> None: + """Write to file.""" + self.path.parent.mkdir(parents=True, exist_ok=True) + self.__class__._file_write_method(df, self.path, **self.kwargs) # type: ignore + + +class HdfFileResource(BaseFileResource): + """HDF file resource.""" + + pickle_protocol: int = Field(4, ge=0, le=5) # Default covers python 3.4+ + + def _resource_read(self) -> pd.DataFrame: + """Read from HDF file.""" + with hdf_lock: + return super()._resource_read() + + def _resource_write(self, df: pd.DataFrame) -> None: + """Write to HDF file.""" + with utils.pickle_protocol(protocol=self.pickle_protocol), hdf_lock: + df.to_hdf(self.path, key="df", mode="w", **self.kwargs) + + +class CsvFileResource(BaseFileResource): + """CSV file resource.""" + + _file_read_method = pd.read_csv # type: ignore + _file_write_method = pd.DataFrame.to_csv + + +class JsonFileResource(BaseFileResource): + """JSON file resource.""" + + _file_read_method = pd.read_json # type: ignore + _file_write_method = pd.DataFrame.to_json + + +class ParquetFileResource(BaseFileResource): + """Parquet file resource.""" + + _file_read_method = pd.read_parquet + _file_write_method = pd.DataFrame.to_parquet diff --git a/dynamicio/handlers/keyed.py b/dynamicio/handlers/keyed.py new file mode 100644 index 0000000..d9f11c6 --- /dev/null +++ b/dynamicio/handlers/keyed.py @@ -0,0 +1,64 @@ +# pylint: disable=W0707 +"""KeyedResource class for reading and writing to different resources based on a key.""" + +import os +from copy import deepcopy +from typing import Dict, Optional + +import pandas as pd + +from dynamicio.base import BaseResource + + +class KeyedResource(BaseResource): + """A resource that can be read from and written to based on a key. + + define keyed_resources as a dict of resources keyed by a string. + + Warning: + key_env_var_name is case-insensitive and expects env vars to be uppercase. + """ + + keyed_resources: Dict[str, BaseResource] + default_key: str = "default" + selected_key: Optional[str] = None + + def __getitem__(self, key: str) -> BaseResource: + """Get resource by key.""" + return self.keyed_resources[key] + + def set_key_from_env(self, env_var_name: str = "DYNAMICIO_RESOURCE_KEY") -> "KeyedResource": + """Set key from environment variable. env_var_name defaults to self.key_env_var_name. Immutable.""" + new = deepcopy(self) + new.selected_key = os.environ.get(env_var_name.upper()) + return new + + def set_key(self, key: str) -> "KeyedResource": + """Set key explicitly. Immutable.""" + new = deepcopy(self) + new.selected_key = key + return new + + def inject(self, **kwargs) -> "KeyedResource": + """Inject kwargs into selected resource. Warning, correct resource needs to be selected first. Immutable.""" + new = deepcopy(self) + for key, resource in new.keyed_resources.items(): + new.keyed_resources[key] = resource.inject(**kwargs) + return new + + def _resource_read(self) -> pd.DataFrame: + key = self._get_key() + try: + resource = self.keyed_resources[key] + except KeyError: + raise KeyError(f"Resource key {key} not found in keyed_resources.") + return resource.read(validate=False, log_metrics=False) + + def _resource_write(self, df) -> None: + key = self._get_key() + self.keyed_resources[key].write(df, validate=False, log_metrics=False) + + def _get_key(self) -> str: + if self.selected_key: + return self.selected_key + return self.default_key diff --git a/dynamicio/inject.py b/dynamicio/inject.py new file mode 100644 index 0000000..a367c9c --- /dev/null +++ b/dynamicio/inject.py @@ -0,0 +1,107 @@ +"""Injects dynamic values into a string.""" + +import re +import string +from typing import Any, Dict + +double_bracket_matcher = re.compile(r"""(.*)(\[\[\s*(\S+)\s*]])(.*)""") +curly_braces_matcher = re.compile(r"(.*)(\{\s*(\S+)\s*\})(.*)") + + +class InjectionError(Exception): + """Raised when a string has any dynamic values in the form of "{DYNAMIC_VAR}" or "[[ DYNAMIC_VAR ]]".""" + + +def inject(value: str, **kwargs) -> str: + """Parse a string and replace any "{DYNAMIC_VAR}" and "[[ DYNAMIC_VAR ]]" with the respective values in the kwargs. + + case-insensitive. + Args: + value: A string with dynamic values in the form of "{DYNAMIC_VAR}" or "[[ DYNAMIC_VAR ]]". + kwargs: A mapping of values to replace in the path. + + Returns: + str: String with all dynamic values replaced. + """ + value = _inject_square_bracket_vars(value, **kwargs) + value = _inject_curly_braces_vars(value, **kwargs) + return value + + +def check_injections(value: str) -> None: + """Raise if a string has any dynamic values in the form of "{DYNAMIC_VAR}" or "[[ DYNAMIC_VAR ]]".""" + _check_square_bracket_injections(value) + _check_curly_braces_injections(value) + + +def _check_square_bracket_injections(value: str) -> None: + while _ := double_bracket_matcher.search(value): + raise InjectionError(f'Path is not fully injected: "{value!r}"') + + +def _check_curly_braces_injections(value: str) -> None: + fields = [group[1] for group in string.Formatter().parse(value) if group[1] is not None] + if len(fields) > 0: + raise InjectionError(f'Path is not fully injected: "{value!r}"') + + +def _inject_square_bracket_vars(value: str, **kwargs) -> str: + """Inject dynamic values in the form of "[[ DYNAMIC_VAR ]]". case-insensitive. + + Args: + value: A string with dynamic values in the form of "[[ DYNAMIC_VAR ]]". + kwargs: Any kwargs to inject into the string. + + Returns: + str: String with all dynamic values replaced. + """ + return _inject_with_matcher(value, double_bracket_matcher, **kwargs) + + +def _inject_curly_braces_vars(value: str, **kwargs) -> str: + """Parse a string and replace any "{DYNAMIC_VAR}" with the respective values in the kwargs. case-insensitive. + + Args: + path: A string with dynamic values in the form of "{DYNAMIC_VAR}". + kwargs: A mapping of values to replace in the path. + + Returns: + str: The path with the dynamic values replaced with the respective values in the kwargs. + """ + return _inject_with_matcher(value, curly_braces_matcher, **kwargs) + + +def _inject_with_matcher(value: str, matcher, **kwargs) -> str: + """Replaces any matching dynamic values. + + Args: + path: A string with dynamic values. + matcher: A regex matcher to find the dynamic values. + kwargs: A mapping of values to replace in the path. + + Returns: + str: The path with the dynamic values replaced with the respective values in the kwargs. + """ + kwargs_lower = {k.lower(): v for k, v in kwargs.items()} # case-insensitive + + replacements: Dict[str, Any] = {} + + temp_suffix_value = "" + + while result := matcher.search(value): + str_to_replace = result.group(3).lower() # we want to be case-insensitive + replacement = kwargs_lower.get(str_to_replace, None) + + if replacement is None: + suffix = matcher.sub("\\g<2>\\g<4>", value) + temp_suffix_value = f"{suffix}{temp_suffix_value}" + value = matcher.sub("\\g<1>", value) + else: + replacements[str_to_replace] = replacement + + # finds the first match and replaces it + value = matcher.sub(f"\\g<1>{replacement}\\g<4>", value) + + value = f"{value}{temp_suffix_value}" + + return value diff --git a/dynamicio/utils.py b/dynamicio/utils.py new file mode 100644 index 0000000..9bc6cbb --- /dev/null +++ b/dynamicio/utils.py @@ -0,0 +1,20 @@ +"""Utilities for dynamicio.""" + +from contextlib import contextmanager + + +@contextmanager +def pickle_protocol(protocol: int): + """Downgrade to the provided pickle protocol within the context manager. + + Args: + protocol: The number of the protocol HIGHEST_PROTOCOL to downgrade to. + """ + import pickle # pylint: disable=import-outside-toplevel + + previous = pickle.HIGHEST_PROTOCOL + try: + pickle.HIGHEST_PROTOCOL = protocol + yield + finally: + pickle.HIGHEST_PROTOCOL = previous diff --git a/dynamicio/validators.py b/dynamicio/validators.py new file mode 100644 index 0000000..fa45d12 --- /dev/null +++ b/dynamicio/validators.py @@ -0,0 +1 @@ +"""Custom validators for the dynamicio, to be used with pandera schemas.""" diff --git a/pyproject.toml b/pyproject.toml index 855818d..b36cb90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [tool.black] py38 = true -line-length = 185 +line-length = 120 include = '\.pyi?$' exclude = ''' ( diff --git a/requirements-dev.txt b/requirements-dev.txt index e96b75e..c1a1bab 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -18,3 +18,4 @@ types-PyYAML==6.0.12.2 types-setuptools==65.5.0.3 types-simplejson==3.17.7.2 yamllint==1.28.0 +pytest-mock==3.10.0 diff --git a/requirements.txt b/requirements.txt index bb39097..2ab1e5f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ fsspec==2022.3.0 kafka-python~=2.0.2 logzero>=1.7.0 magic-logger>=1.0.2 -pandas>=1.2.4 +pandas~=1.2 psycopg2-binary~=2.9.3 pyarrow>=7.0.0 python-json-logger~=2.0.1 @@ -15,3 +15,4 @@ simplejson~=3.17.2 SQLAlchemy~=1.4.11 tables~=3.7.0 pydantic~=1.10.2 +pandera~=0.14.5 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..3de9313 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,15 @@ +import shutil +from pathlib import Path +from typing import Generator + +import pytest + +from tests.constants import TEST_RESOURCES + + +@pytest.fixture(scope="session") +def output_dir_path() -> Generator[Path, None, None]: + output_dir_path = Path(TEST_RESOURCES / "data/temp/output") + yield output_dir_path + if output_dir_path.exists(): + shutil.rmtree(output_dir_path) diff --git a/tests/constants.py b/tests/constants.py new file mode 100644 index 0000000..33a6d92 --- /dev/null +++ b/tests/constants.py @@ -0,0 +1,5 @@ +"""A module with constants used in tests.""" + +from pathlib import Path + +TEST_RESOURCES = Path(__file__).parent / "resources" diff --git a/tests/resources/data/external/h5_with_more_columns.h5 b/tests/resources/data/external/h5_with_more_columns.h5 new file mode 100644 index 0000000..37b4fa4 Binary files /dev/null and b/tests/resources/data/external/h5_with_more_columns.h5 differ diff --git a/tests/resources/data/external/json_with_more_columns.json b/tests/resources/data/external/json_with_more_columns.json new file mode 100644 index 0000000..75bd97c --- /dev/null +++ b/tests/resources/data/external/json_with_more_columns.json @@ -0,0 +1,104 @@ +{ + "id":{ + "0":1, + "1":2, + "2":3, + "3":4, + "4":5, + "5":6, + "6":7, + "7":8, + "8":9, + "9":10, + "10":11, + "11":12, + "12":13, + "13":14, + "14":15 + }, + "foo_name":{ + "0":"foo_a", + "1":"foo_b", + "2":"foo_a", + "3":"foo_b", + "4":"foo_a", + "5":"foo_b", + "6":"foo_a", + "7":"foo_b", + "8":"foo_a", + "9":"foo_b", + "10":"foo_a", + "11":"foo_b", + "12":"foo_a", + "13":"foo_b", + "14":"foo_a" + }, + "bar":{ + "0":1, + "1":2, + "2":3, + "3":4, + "4":5, + "5":6, + "6":7, + "7":8, + "8":9, + "9":10, + "10":11, + "11":12, + "12":13, + "13":14, + "14":15 + }, + "bar_type":{ + "0":"my-type", + "1":"my-type", + "2":"my-type", + "3":"my-type", + "4":"my-type", + "5":"my-type", + "6":"my-type", + "7":"my-type", + "8":"my-type", + "9":"my-type", + "10":"my-type", + "11":"my-type", + "12":"my-type", + "13":"my-type", + "14":"my-type" + }, + "a_number":{ + "0":1500, + "1":1500, + "2":1500, + "3":1500, + "4":1500, + "5":1500, + "6":1500, + "7":1500, + "8":1500, + "9":1500, + "10":1500, + "11":1500, + "12":1500, + "13":1500, + "14":1500 + }, + "b_number":{ + "0":1600, + "1":1600, + "2":1600, + "3":1600, + "4":1600, + "5":1600, + "6":1600, + "7":1600, + "8":1600, + "9":1600, + "10":1600, + "11":1600, + "12":1600, + "13":1600, + "14":1600 + } +} \ No newline at end of file diff --git a/tests/resources/data/input/batch/hdf/part_01.h5 b/tests/resources/data/input/batch/hdf/part_01.h5 new file mode 100644 index 0000000..f8f5e23 Binary files /dev/null and b/tests/resources/data/input/batch/hdf/part_01.h5 differ diff --git a/tests/resources/data/input/batch/hdf/part_02.h5 b/tests/resources/data/input/batch/hdf/part_02.h5 new file mode 100644 index 0000000..62b6174 Binary files /dev/null and b/tests/resources/data/input/batch/hdf/part_02.h5 differ diff --git a/tests/resources/data/input/batch/not_just_hdf/part_01.h5 b/tests/resources/data/input/batch/not_just_hdf/part_01.h5 new file mode 100644 index 0000000..f8f5e23 Binary files /dev/null and b/tests/resources/data/input/batch/not_just_hdf/part_01.h5 differ diff --git a/tests/resources/data/input/batch/not_just_hdf/part_02.h5 b/tests/resources/data/input/batch/not_just_hdf/part_02.h5 new file mode 100644 index 0000000..62b6174 Binary files /dev/null and b/tests/resources/data/input/batch/not_just_hdf/part_02.h5 differ diff --git a/tests/resources/data/input/batch/not_just_hdf/something_to_ignore.txt b/tests/resources/data/input/batch/not_just_hdf/something_to_ignore.txt new file mode 100644 index 0000000..e69de29 diff --git a/tests/resources/data/input/batch/not_just_parquet/part_01.parquet b/tests/resources/data/input/batch/not_just_parquet/part_01.parquet new file mode 100644 index 0000000..fb5deb4 Binary files /dev/null and b/tests/resources/data/input/batch/not_just_parquet/part_01.parquet differ diff --git a/tests/resources/data/input/batch/not_just_parquet/part_02.parquet b/tests/resources/data/input/batch/not_just_parquet/part_02.parquet new file mode 100644 index 0000000..00934a8 Binary files /dev/null and b/tests/resources/data/input/batch/not_just_parquet/part_02.parquet differ diff --git a/tests/resources/data/input/batch/not_just_parquet/something_to_ignore.txt b/tests/resources/data/input/batch/not_just_parquet/something_to_ignore.txt new file mode 100644 index 0000000..e69de29 diff --git a/tests/resources/data/input/batch/parquet/part_01.parquet b/tests/resources/data/input/batch/parquet/part_01.parquet new file mode 100644 index 0000000..fb5deb4 Binary files /dev/null and b/tests/resources/data/input/batch/parquet/part_01.parquet differ diff --git a/tests/resources/data/input/batch/parquet/part_02.parquet b/tests/resources/data/input/batch/parquet/part_02.parquet new file mode 100644 index 0000000..00934a8 Binary files /dev/null and b/tests/resources/data/input/batch/parquet/part_02.parquet differ diff --git a/tests/resources/data/input/batch/parquet_w_empty_files/emptyfile.parquet b/tests/resources/data/input/batch/parquet_w_empty_files/emptyfile.parquet new file mode 100644 index 0000000..9eea44b Binary files /dev/null and b/tests/resources/data/input/batch/parquet_w_empty_files/emptyfile.parquet differ diff --git a/tests/resources/data/input/batch/parquet_w_empty_files/fullfile.parquet b/tests/resources/data/input/batch/parquet_w_empty_files/fullfile.parquet new file mode 100644 index 0000000..5fdaca4 Binary files /dev/null and b/tests/resources/data/input/batch/parquet_w_empty_files/fullfile.parquet differ diff --git a/tests/resources/data/input/parquet_sample.parquet b/tests/resources/data/input/parquet_sample.parquet new file mode 100644 index 0000000..e42a9db Binary files /dev/null and b/tests/resources/data/input/parquet_sample.parquet differ diff --git a/tests/resources/data/input/pg_parquet_sample.parquet b/tests/resources/data/input/pg_parquet_sample.parquet new file mode 100644 index 0000000..ddf1d30 Binary files /dev/null and b/tests/resources/data/input/pg_parquet_sample.parquet differ diff --git a/tests/resources/data/input/some_csv_to_read.csv b/tests/resources/data/input/some_csv_to_read.csv new file mode 100644 index 0000000..a6f8c5f --- /dev/null +++ b/tests/resources/data/input/some_csv_to_read.csv @@ -0,0 +1,16 @@ +id,foo_name,bar +1,name_a,1 +2,name_b,2 +3,name_a,3 +4,name_b,4 +5,name_a,5 +6,name_b,6 +7,name_a,7 +8,name_b,8 +9,name_a,9 +10,name_b,10 +11,name_a,11 +12,name_b,12 +13,name_a,13 +14,name_b,14 +15,name_a,15 \ No newline at end of file diff --git a/tests/resources/data/input/some_hdf_to_read.h5 b/tests/resources/data/input/some_hdf_to_read.h5 new file mode 100644 index 0000000..39e6995 Binary files /dev/null and b/tests/resources/data/input/some_hdf_to_read.h5 differ diff --git a/tests/resources/data/input/some_json_to_read.json b/tests/resources/data/input/some_json_to_read.json new file mode 100644 index 0000000..0e6bd00 --- /dev/null +++ b/tests/resources/data/input/some_json_to_read.json @@ -0,0 +1,53 @@ +{ + "id":{ + "0":1, + "1":2, + "2":3, + "3":4, + "4":5, + "5":6, + "6":7, + "7":8, + "8":9, + "9":10, + "10":11, + "11":12, + "12":13, + "13":14, + "14":15 + }, + "foo_name":{ + "0":"name_a", + "1":"name_b", + "2":"name_a", + "3":"name_b", + "4":"name_a", + "5":"name_b", + "6":"name_a", + "7":"name_b", + "8":"name_a", + "9":"name_b", + "10":"name_a", + "11":"name_b", + "12":"name_a", + "13":"name_b", + "14":"name_a" + }, + "bar":{ + "0":1, + "1":2, + "2":3, + "3":4, + "4":5, + "5":6, + "6":7, + "7":8, + "8":9, + "9":10, + "10":11, + "11":12, + "12":13, + "13":14, + "14":15 + } +} \ No newline at end of file diff --git a/tests/resources/data/processed/.gitkeep b/tests/resources/data/processed/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/resources/data/processed/some_parquet_to_read.parquet b/tests/resources/data/processed/some_parquet_to_read.parquet new file mode 100644 index 0000000..e42a9db Binary files /dev/null and b/tests/resources/data/processed/some_parquet_to_read.parquet differ diff --git a/tests/resources/data/temp/.gitkeep b/tests/resources/data/temp/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/resources/schemas.py b/tests/resources/schemas.py new file mode 100644 index 0000000..2cf414b --- /dev/null +++ b/tests/resources/schemas.py @@ -0,0 +1,10 @@ +# pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring, R0801 + +from pandera import SchemaModel +from pandera.typing import Series + + +class SomeParquetToRead(SchemaModel): + id: Series[int] + bar: Series[int] + foo_name: Series[str] diff --git a/tests/test_inject.py b/tests/test_inject.py new file mode 100644 index 0000000..8fc7273 --- /dev/null +++ b/tests/test_inject.py @@ -0,0 +1,117 @@ +# flake8: noqa: I101 + +from datetime import datetime + +import numpy as np +import pytest + +from dynamicio.inject import ( + InjectionError, + _check_curly_braces_injections, + _check_square_bracket_injections, + _inject_curly_braces_vars, + _inject_square_bracket_vars, +) + + +def test_inject_square_bracket_vars(): + res = _inject_square_bracket_vars("hello [[ world ]]", world="there") + assert res == "hello there" + + +def test_inject_square_bracket_vars_kwargs_is_case_insensitive(): + res = _inject_square_bracket_vars("hello [[ world ]]", WORLD="there") + assert res == "hello there" + + +def test_inject_square_bracket_vars_value_is_case_insensitive(): + res = _inject_square_bracket_vars("hello [[ WOrLD ]]", world="there") + assert res == "hello there" + + +def test_inject_square_bracket_vars_matches_multiple(): + res = _inject_square_bracket_vars("[[ VAR1 ]]/[[VAR2]]", var1="hello", var2="there") + assert res == "hello/there" + + +def test_inject_square_bracket_vars_various_data_types(): + res = _inject_square_bracket_vars( + "[[ VAR1 ]]/[[ VAR2 ]]/[[ VAR3 ]]/[[ VAR4 ]]/[[ VAR5 ]]/[[ VAR6 ]]", + var1=1, + var2=[1, 2, 3], + var3={"hello": "there"}, + var4=34.98, + var5=datetime(2021, 1, 1), + var6=np.array([1, 2, 3]), + ) + assert res == "1/[1, 2, 3]/{'hello': 'there'}/34.98/2021-01-01 00:00:00/[1 2 3]" + + +def test_inject_square_bracket_vars_accepts_extra(): + res = _inject_square_bracket_vars("[[ VAR1 ]]", var1="hello", var2="there", var3="extra") + assert res == "hello" + + +def test_inject_curly_braces_vars(): + res = _inject_curly_braces_vars("hello {world}", world="there") + assert res == "hello there" + + +def test_inject_curly_braces_vars_kwargs_is_case_insensitive(): + res = _inject_curly_braces_vars("hello {world}", WORLD="there") + assert res == "hello there" + + +def test_inject_curly_braces_vars_value_is_case_insensitive(): + res = _inject_curly_braces_vars("hello {WOrLD}", world="there") + assert res == "hello there" + + +def test_inject_curly_braces_vars_matches_multiple(): + res = _inject_curly_braces_vars("{VAR1}/{VAR2}", var1="hello", var2="there") + assert res == "hello/there" + + +def test_inject_curly_braces_vars_various_data_types(): + res = _inject_curly_braces_vars( + "{VAR1}/{VAR2}/{VAR3}/{VAR4}/{VAR5}/{VAR6}", + var1=1, + var2=[1, 2, 3], + var3={"hello": "there"}, + var4=34.98, + var5=datetime(2021, 1, 1), + var6=np.array([1, 2, 3]), + ) + assert res == "1/[1, 2, 3]/{'hello': 'there'}/34.98/2021-01-01 00:00:00/[1 2 3]" + + +def test_inject_curly_braces_vars_accepts_extra(): + res = _inject_curly_braces_vars("{VAR1}", var1="hello", var2="there", var3="extra") + assert res == "hello" + + +def test_inject_curly_braces_accepts_no_vars_in_value(): + res = _inject_curly_braces_vars("hi", var1="hello") + assert res == "hi" + + +def test_inject_square_bracket_vars_works_correctly_with_multiple_some_not_injected(): + result = _inject_square_bracket_vars("[[ VAR1 ]]/[[ VAR2 ]]/[[ VAR3 ]]", var2="there") + assert result == "[[ VAR1 ]]/there/[[ VAR3 ]]" + + +def test_inject_curly_braces_vars_works_correctly_with_multiple_some_not_injected(): + result = _inject_curly_braces_vars("{VAR1}/{VAR2}/{VAR3}", var2="there") + assert result == "{VAR1}/there/{VAR3}" + + +def test__check_square_bracket_injections_throws_on_missing_var(): + with pytest.raises(InjectionError): + result = _inject_square_bracket_vars("[[ VAR1 ]]/[[ VAR2 ]]/[[ VAR3 ]]", var2="there") + _check_square_bracket_injections(result) + + +def test_inject_curly_braces_vars_throws_on_missing_var(): + with pytest.raises(InjectionError): + result = _inject_curly_braces_vars("{VAR1}", var2="there") + _check_curly_braces_injections(result) diff --git a/tests/test_parquet_file.py b/tests/test_parquet_file.py new file mode 100644 index 0000000..cafea8c --- /dev/null +++ b/tests/test_parquet_file.py @@ -0,0 +1,43 @@ +import pandas as pd +import pytest + +from dynamicio.handlers.file import ParquetFileResource +from tests import constants +from tests.resources.schemas import SomeParquetToRead + + +@pytest.fixture() +def parquet_file_resource() -> ParquetFileResource: + return ParquetFileResource( + path=f"{constants.TEST_RESOURCES}/data/input/parquet_sample.parquet", + allow_no_schema=True, + ) + + +@pytest.fixture() +def parquet_df(parquet_file_resource) -> pd.DataFrame: + return parquet_file_resource.read() + + +@pytest.fixture() +def parquet_write_resource() -> ParquetFileResource: + return ParquetFileResource( + path=f"{constants.TEST_RESOURCES}/data/processed/parquet_sample.parquet", + allow_no_schema=True, + ) + + +def test__resource_read(parquet_file_resource, parquet_df): + df = parquet_file_resource.read() + pd.testing.assert_frame_equal(df, parquet_df) + + +def test__resource_read_with_schema(parquet_file_resource, parquet_df): + df = parquet_file_resource.read(pa_schema=SomeParquetToRead) + pd.testing.assert_frame_equal(df, parquet_df) + + +def test__resource_write(parquet_write_resource, parquet_df): + parquet_write_resource.write(parquet_df) + df = pd.read_parquet(parquet_write_resource.path) + pd.testing.assert_frame_equal(df, parquet_df) diff --git a/tests/test_resources/__init__.py b/tests/test_resources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_resources/file/__init__.py b/tests/test_resources/file/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_resources/file/test_file.py b/tests/test_resources/file/test_file.py new file mode 100644 index 0000000..b98b5f1 --- /dev/null +++ b/tests/test_resources/file/test_file.py @@ -0,0 +1,67 @@ +from pathlib import Path + +import pandas as pd +import pytest +from mock import call + +from dynamicio.handlers.file import ParquetFileResource +from dynamicio.inject import InjectionError + + +def test_file_resource_inject_read(mocker): + mock_function = mocker.patch( + "dynamicio.handlers.file.ParquetFileResource._file_read_method", return_value=pd.DataFrame() + ) + + resource = ParquetFileResource(path="foo/{bar}/baz", kwargs={"foo": "bar"}, allow_no_schema=True) + resource = resource.inject(bar="baz") + resource.read() + + resource = ParquetFileResource(path="foo/[[bar]]/baz", kwargs={"foo": "bar"}, allow_no_schema=True) + resource = resource.inject(bar="boo") + resource.read() + + mock_function.assert_has_calls([call(Path("foo/baz/baz"), foo="bar"), call(Path("foo/boo/baz"), foo="bar")]) + + +def test_file_resource_inject_read_raises_on_incomplete_injection(): + resource = ParquetFileResource(path="foo/{bar}/{baz}", kwargs={"foo": "bar"}, allow_no_schema=True) + resource = resource.inject(bar="baz") + + with pytest.raises(InjectionError): + resource.read() + + resource = ParquetFileResource(path="foo/[[bar]]/[[baz]]", kwargs={"foo": "bar"}, allow_no_schema=True) + resource = resource.inject(bar="baz") + + with pytest.raises(InjectionError): + resource.read() + + +def test_file_resource_inject_write(mocker): + mock_function = mocker.patch( + "dynamicio.handlers.file.ParquetFileResource._file_write_method", return_value=pd.DataFrame() + ) + + df = pd.DataFrame() + resource = ParquetFileResource(path="foo/{bar}/baz", kwargs={"foo": "bar"}, allow_no_schema=True) + resource = resource.inject(bar="baz") + resource.write(df) + + resource = ParquetFileResource(path="foo/[[bar]]/baz", kwargs={"foo": "bar"}, allow_no_schema=True) + resource = resource.inject(bar="boo") + resource.write(df) + + mock_function.assert_has_calls([call(df, Path("foo/baz/baz"), foo="bar"), call(df, Path("foo/boo/baz"), foo="bar")]) + + +def test_file_resource_inject_write_raises_on_incomplete_injection(): + resource = ParquetFileResource(path="foo/{bar}/{baz}", kwargs={"foo": "bar"}, allow_no_schema=True) + resource = resource.inject(bar="baz") + with pytest.raises(InjectionError): + resource.write(pd.DataFrame()) + + resource = ParquetFileResource(path="foo/[[bar]]/[[baz]]", kwargs={"foo": "bar"}, allow_no_schema=True) + resource = resource.inject(bar="boo") + with pytest.raises(InjectionError): + resource.write(pd.DataFrame()) diff --git a/tests/test_resources/file/test_parquet.py b/tests/test_resources/file/test_parquet.py new file mode 100644 index 0000000..34e5c11 --- /dev/null +++ b/tests/test_resources/file/test_parquet.py @@ -0,0 +1,21 @@ +import pandas as pd + +from dynamicio.handlers import ParquetFileResource +from tests.constants import TEST_RESOURCES + + +def test_parquet_resource_read(): + test_path = TEST_RESOURCES / "data/input/parquet_sample.parquet" + resource = ParquetFileResource(path=test_path, allow_no_schema=True) + df = resource.read() + target_df = pd.read_parquet(test_path) + pd.testing.assert_frame_equal(df, target_df) + + +def test_parquet_resource_write(output_dir_path): + test_path = output_dir_path / "test_parquet_resource_write.parquet" + resource = ParquetFileResource(path=test_path, allow_no_schema=True) + df = pd.DataFrame({"A": [1, 2, 3], "B": ["a", "b", "c"]}) + resource.write(df) + target_df = pd.read_parquet(test_path) + pd.testing.assert_frame_equal(df, target_df)