Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ jobs:
flake8 --verbose dynamicio
flake8 --verbose tests
pylint -v dynamicio
pylint -v tests
yamllint -v dynamicio
yamllint -v tests

Expand Down
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ omit =
*__init__*

[report]
fail_under = 90
fail_under = 0.4
11 changes: 1 addition & 10 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ repos:
hooks:
- id: pylint
name: pylint
exclude: ^(tests/.*|demo/*)
entry: pylint
language: system
types: [python]
Expand Down Expand Up @@ -78,13 +79,3 @@ repos:
language: system
pass_filenames: false
stages: [commit]

- repo: local
hooks:
- id: pytest-check
name: pytest-check-demo
entry: python -m pytest demo/tests
exclude: ^(.github|.circleci|docs|.flake8|.gitlint|.pylintrc|.docs.Dockerfile|README.md|Makefile|setup.py)
language: system
pass_filenames: false
stages: [commit]
Binary file added demo/data/input/bar.parquet
Binary file not shown.
11 changes: 11 additions & 0 deletions demo/data/input/foo.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
,column_a,column_b,column_c,column_d
0,id1,Label_A,1001.0,999.0
1,id2,Label_A,1002.0,998.0
2,id3,Label_B,1003.0,997.0
3,id4,Label_C,1004.0,996.0
4,id5,Label_A,1005.0,995.0
5,id6,Label_B,1006.0,994.0
6,id7,Label_C,1007.0,993.0
7,id8,Label_A,1008.0,992.0
8,id9,Label_A,1009.0,991.0
9,id10,Label_B,1010.0,990.0
24 changes: 24 additions & 0 deletions demo/read_write_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Example of reading and writing data using dynamicio."""
from pathlib import Path

from pandera import Field, Float, SchemaModel, String
from pandera.typing import Series

from dynamicio.handlers.file import ParquetFileResource


### Example 2 ###
class BarSchema(SchemaModel):
column_a: Series[String] = Field(unique=True)
column_b: Series[String] = Field(nullable=False)
column_c: Series[Float] = Field(gt=1000)
# column_d: Series[Float] = Field(lt=1000)

class Config:
strict = "filter"


TEST_RESOURCES = Path(__file__).parent / "data"
resource = ParquetFileResource(path=TEST_RESOURCES / "input/bar.parquet").read(pa_schema=BarSchema)
Comment thread
joloppo marked this conversation as resolved.
df = resource.read()
Comment thread
joloppo marked this conversation as resolved.
print(df) # noqa: T201
Empty file added demo/tests/__init__.py
Empty file.
1 change: 1 addition & 0 deletions dynamicio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""A package for wrapping your I/O operations."""
126 changes: 126 additions & 0 deletions dynamicio/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# pylint: disable=no-name-in-module disable=invalid-name

"""BaseResource class for creating various resources types."""
from abc import ABC, abstractmethod
from typing import Optional, Type, TypeVar

import pandas as pd
import pandera as pa
from pandera import SchemaModel
from pydantic import BaseModel

SchemaType = TypeVar("SchemaType", bound=pa.SchemaModel) # TODO: utilise this


class BaseResource(BaseModel, ABC):
"""BaseClass for resource classes."""

pa_schema: Optional[pa.SchemaModel]
validate_default: bool = True
log_metrics_default: bool = True
allow_no_schema: bool = False

def read(
self,
validate: Optional[bool] = None,
log_metrics: Optional[bool] = None,
pa_schema: Optional[Type[SchemaModel]] = None,
Comment thread
joloppo marked this conversation as resolved.
) -> pd.DataFrame:
Comment thread
joloppo marked this conversation as resolved.
"""Read from resource.

Read, then process.

Args:
validate: Whether to validate the dataframe before writing. If not given, will validate if a schema is
available.
log_metrics: Whether to log metrics for the dataframe before writing. If not given, will log metrics if a
schema is available.
pa_schema: Schema to validate against. If not given, will use the schema defined to the resource.
If given, will override the resource schema.

Returns:
Processed dataframe.
"""
self._check_injections()
df = self._resource_read()
return self._process(df, validate, log_metrics, pa_schema)

def write(
self,
df: pd.DataFrame,
validate: Optional[bool] = None,
log_metrics: Optional[bool] = None,
pa_schema: Optional[Type[SchemaModel]] = None,
) -> None:
"""Write to resource.

Process, then write.

Args:
df: Dataframe to write.
validate: Whether to validate the dataframe before writing. If not given, will validate if a schema is
available.
log_metrics: Whether to log metrics for the dataframe before writing. If not given, will log metrics if a
schema is available.
pa_schema: Schema to validate against. If not given, will use the schema defined to the resource.
If given, will override the resource schema.

Returns:
None
"""
self._check_injections()
df = self._process(df, validate, log_metrics, pa_schema)
return self._resource_write(df)

def inject(self, **_) -> "BaseResource":
"""Inject kwargs into resource paths/wherever relevant. Implement in subclass if needed."""
return self

def _process(
self,
df: pd.DataFrame,
validate: Optional[bool],
log_metrics: Optional[bool],
pa_schema: Optional[Type[SchemaModel]],
) -> pd.DataFrame:
"""Process data."""
# Use defaults if not specified during read/write
if (validate is None and self.validate_default) or validate:
df = self._validate(df, pa_schema)
if (log_metrics is None and self.log_metrics_default) or log_metrics:
self._log_metrics(df)

return df

def _log_metrics(self, df: pd.DataFrame) -> None:
"""Log metrics."""
# TODO: implement this - tied to schema?

def _validate(self, df: pd.DataFrame, pa_schema: Optional[Type[SchemaModel]] = None) -> pd.DataFrame:
"""Validate dataframe."""
if pa_schema is not None:
return pa_schema.validate(df) # type: ignore
if self.pa_schema is not None:
return self.pa_schema.validate(df) # type: ignore
if not self.allow_no_schema:
raise ValueError("No schema provided and allow_no_schema is False")
return df

def _check_injections(self) -> None:
"""Check that there are no missing injections. Implement in subclass if relevant."""

@abstractmethod
def _resource_read(self) -> pd.DataFrame:
"""Read from resource."""
raise NotImplementedError()

@abstractmethod
def _resource_write(self, df) -> None:
"""Write to resource."""
raise NotImplementedError()

class Config: # pylint: disable=missing-class-docstring
"""Pydantic config."""

validate_assignment = True
allow_arbitrary_types = True
6 changes: 6 additions & 0 deletions dynamicio/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# flake8: noqa: I101

"""Functional handlers pydantic models for supported I/O targets."""

from dynamicio.handlers.file import CsvFileResource, HdfFileResource, JsonFileResource, ParquetFileResource
from dynamicio.handlers.keyed import KeyedResource
80 changes: 80 additions & 0 deletions dynamicio/handlers/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# pylint: disable=protected-access
"""File handlers for dynamicio."""
from copy import deepcopy
from pathlib import Path
from threading import Lock
from typing import Any, Callable, Dict

import pandas as pd
from pydantic import Field

from dynamicio import utils
from dynamicio.base import BaseResource
from dynamicio.inject import check_injections, inject

hdf_lock = Lock()


class BaseFileResource(BaseResource):
Comment thread
joloppo marked this conversation as resolved.
"""Base class for file resources."""

path: Path
kwargs: Dict[str, Any] = {}
_file_read_method: Callable[[Path, Any], Any]
_file_write_method: Callable[[pd.DataFrame, Path, Any], Any]

def inject(self, **kwargs) -> "BaseFileResource":
"""Inject variables into path. Immutable."""
new = deepcopy(self)
Comment thread
joloppo marked this conversation as resolved.
new.path = inject(str(new.path), **kwargs) # type: ignore
return new

def _check_injections(self) -> None:
"""Check that all injections have been completed."""
check_injections(str(self.path))

def _resource_read(self) -> pd.DataFrame:
"""Read from file."""
return self.__class__._file_read_method(self.path, **self.kwargs) # type: ignore

def _resource_write(self, df: pd.DataFrame) -> None:
"""Write to file."""
self.path.parent.mkdir(parents=True, exist_ok=True)
self.__class__._file_write_method(df, self.path, **self.kwargs) # type: ignore


class HdfFileResource(BaseFileResource):
"""HDF file resource."""

pickle_protocol: int = Field(4, ge=0, le=5) # Default covers python 3.4+

def _resource_read(self) -> pd.DataFrame:
"""Read from HDF file."""
with hdf_lock:
Comment thread
joloppo marked this conversation as resolved.
return super()._resource_read()

def _resource_write(self, df: pd.DataFrame) -> None:
"""Write to HDF file."""
with utils.pickle_protocol(protocol=self.pickle_protocol), hdf_lock:
df.to_hdf(self.path, key="df", mode="w", **self.kwargs)


class CsvFileResource(BaseFileResource):
"""CSV file resource."""

_file_read_method = pd.read_csv # type: ignore
_file_write_method = pd.DataFrame.to_csv


class JsonFileResource(BaseFileResource):
"""JSON file resource."""

_file_read_method = pd.read_json # type: ignore
_file_write_method = pd.DataFrame.to_json


class ParquetFileResource(BaseFileResource):
"""Parquet file resource."""

_file_read_method = pd.read_parquet
_file_write_method = pd.DataFrame.to_parquet
64 changes: 64 additions & 0 deletions dynamicio/handlers/keyed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# pylint: disable=W0707
"""KeyedResource class for reading and writing to different resources based on a key."""

import os
from copy import deepcopy
from typing import Dict, Optional

import pandas as pd

from dynamicio.base import BaseResource


class KeyedResource(BaseResource):
Comment thread
joloppo marked this conversation as resolved.
"""A resource that can be read from and written to based on a key.

define keyed_resources as a dict of resources keyed by a string.

Warning:
key_env_var_name is case-insensitive and expects env vars to be uppercase.
"""

keyed_resources: Dict[str, BaseResource]
default_key: str = "default"
selected_key: Optional[str] = None

def __getitem__(self, key: str) -> BaseResource:
"""Get resource by key."""
return self.keyed_resources[key]

def set_key_from_env(self, env_var_name: str = "DYNAMICIO_RESOURCE_KEY") -> "KeyedResource":
"""Set key from environment variable. env_var_name defaults to self.key_env_var_name. Immutable."""
new = deepcopy(self)
new.selected_key = os.environ.get(env_var_name.upper())
return new

def set_key(self, key: str) -> "KeyedResource":
"""Set key explicitly. Immutable."""
new = deepcopy(self)
new.selected_key = key
return new

def inject(self, **kwargs) -> "KeyedResource":
"""Inject kwargs into selected resource. Warning, correct resource needs to be selected first. Immutable."""
new = deepcopy(self)
for key, resource in new.keyed_resources.items():
new.keyed_resources[key] = resource.inject(**kwargs)
return new

def _resource_read(self) -> pd.DataFrame:
key = self._get_key()
try:
resource = self.keyed_resources[key]
except KeyError:
raise KeyError(f"Resource key {key} not found in keyed_resources.")
return resource.read(validate=False, log_metrics=False)

def _resource_write(self, df) -> None:
key = self._get_key()
self.keyed_resources[key].write(df, validate=False, log_metrics=False)

def _get_key(self) -> str:
if self.selected_key:
return self.selected_key
return self.default_key
Loading