Skip to content

feat: Add S3, Postgres & Kafka handlers#51

Merged
joloppo merged 43 commits into
v5from
pg_kafka_handlers
Apr 27, 2023
Merged

feat: Add S3, Postgres & Kafka handlers#51
joloppo merged 43 commits into
v5from
pg_kafka_handlers

Conversation

@joloppo
Copy link
Copy Markdown
Contributor

@joloppo joloppo commented Apr 12, 2023

This adds more handlers.

Things that are still missing: Batched io, logging/metrics, 2 special validations

Jira ticket

Quick Start

What your code might look like without dynamicio

# schema_module.py
from pandera import Field, SchemaModel  
from pandera.typing import Series

class BarSchema(SchemaModel):  
    column_a: Series[str] = Field(unique=True)

# data_loading.py
from schema_module import BarSchema
import pandas as pd

proj_bucket = "my_bucket"
proj_prefix = "my_prefix"

df = pd.read_csv(f"s3://{proj_bucket}/{proj_prefix}/cargo_data.csv")  
df = BarSchema.validate(df)

Using dynamicio

# schema_module.py
from pandera import Field, Float, SchemaModel, String  
from pandera.typing import Series

class BarSchema(SchemaModel):  
    column_a: Series[String] = Field(unique=True)

# data_loading.py
from schema_module import BarSchema
from dynamicio import S3CsvResource

resource = S3CsvResource(
    bucket="{proj_bucket}", 
    path="{proj_prefix}/cargo_data.csv"
    pa_schema=BarSchema
)

resource = resource.inject(proj_bucket="my_bucket", proj_prefix="my_prefix")

df = resource.read()

That looks pretty similar. What else can you do?

Validation

Is handled by pandera, docs here. Resources expect a pa_schema by default, either passed when you create the resource or perform io actions. You can opt out by setting allow_no_schema to True.

Resource classes

Resources are pydantic models, which means any necessary configuration is validated and can be completed by your IDE when you create a resource. They are inherited from one BaseResource godobject, that looks like so:

  
class BaseResource(BaseModel, ABC):  
    pa_schema: Optional[pa.SchemaModel]  
    disable_validation: bool = False  
    log_metrics_default: bool = True  
    allow_no_schema: bool = False  

    def inject(self, **kwargs) -> "BaseResource":
        """Inject kwargs into relevant resource attributes. Immutable."""
        return deepcopy(self)

    def read(  
        self,  
        validate: Optional[bool] = None,  
        log_metrics: Optional[bool] = None,  
        pa_schema: Optional[Type[SchemaModel]] = None,  
    ) -> pd.DataFrame:
        ...
    
    def write(
        self,  
        df: pd.DataFrame,  
        validate: Optional[bool] = None,  
        log_metrics: Optional[bool] = None,  
        pa_schema: Optional[Type[SchemaModel]] = None,  
    ) -> None:
        ...

Available resources

  • BaseFileResource (internal)
  • CsvFileResource
  • JsonFileResource
  • ParquetFileResource
  • HdfFileResource

  • BaseS3Resource (internal)
  • S3CsvResource
  • S3JsonResource
  • S3ParquetResource
  • S3HdfResource

  • PostgresResource
  • KafkaResource

Injection

dynamicio supports injection into paths/strings when using {var1} or [[var2]] syntax. We should only have one of those.

  • .inject returns a modified copy of your resource.
    • .inject should be called .clone_with_injections
  • Injection can be done partially and is only checked on read/write.
  • Injecting None values will do nothing
  • Injecting values that are not in the injectables will also do nothing -> you can call .inject(**os.environ) if you really want :( This follows the old method.
  • Depending on your resource, different attributes can be injected. S3 and Files are quite obvious
    • For Postgres you can inject all of: db_user db_password db_host db_port db_name table_name sql_query
# resource_place.py
resource = S3CsvResource(
    bucket="{proj_bucket}", 
    path="day_{day_value}/cargo_data.csv"
    pa_schema=BarSchema
)

# Inject is an immutable action. 
resource = resource.inject(proj_bucket="my_bucket")

# failing_code.py
df = resource.read() <- This would raise an InjectionError.

# succeeding_code.py
for day in [1, 2, 3]:
    df = resource.inject(day_value=day).read()

KeyedResources (formerly 'environments')

KeyedResource exist so you can have different keyed resources. You can set the key, which is also an immutable action, much like inject. Example:

res = KeyedResource(
    keyed_resources={
        "cloud": S3ParquetResource(
	        bucket="mybucket", 
	        path="mypath/[[ file_name ]].parquet", 
	        allow_no_schema=True
		),
        "local": ParquetFileResource(
	        path="mypath/file.parquet", allow_no_schema=True
        ),
    },
    default_key="local"
)
res = res.inject(file_name="some_name")
res = res.set_key("cloud") # <- special method for KeyedResource

df = res.read()

Previously we called these 'environments'. So for most use cases it was either 'cloud' or 'local'. This can be used for testing like it was previousy.

However, This would be great to have special dev environments, for example, No db/services to spin up in dev. Once testing is delegated to uhura, this would prbably be the only use case.

Other:

  • inject injects into all resources when called, so the order of inject and set_key is irrelevant.
  • there is currently a .set_key_from_env as well, but I think that shouldn't exist.

@joloppo joloppo marked this pull request as ready for review April 12, 2023 16:39
@joloppo joloppo requested a review from a team April 12, 2023 16:39
Comment thread dynamicio/handlers/file.py Outdated
Comment thread dynamicio/handlers/s3/file.py Outdated
Comment thread dynamicio/base.py Outdated
SchemaType = TypeVar("SchemaType", bound=pa.SchemaModel) # TODO: utilise this


class BaseResource(BaseModel, ABC):
Copy link
Copy Markdown
Contributor

@arturk-vortexa arturk-vortexa Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, now that I think about it, it seems that BaseResource is both a configuration class, and it contains IO logic... Is this necessary? Could we not pass in a pydantic config object into the resource class instead of inheriting from BaseModel?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this really is the crux of the matter. There are several ways to go about this. My first implementation was along the lines of create a resource (just a pydantic model, no functions) and then pass it to a read/write function that takes a resource of a specific type. The reason I combined these into one, was to keep the use of it simple. We can maybe achieve this with different means.

@deaglancrew and I had a long discussion about using a creational builder pattern here, separationg out configuration, io, injection, validation and logging into their own classes. We did not yet find a technical solution for having a nice API for it and connecting the dots between injection and reading wasn't figured out either. Imo, there might be a tradeoff between usability+readability on one side and testability/solid on the other. Let's have a look at what we can do here in our pairing later.

Copy link
Copy Markdown
Contributor

@arturk-vortexa arturk-vortexa Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From an OOP perspective, I'd love to have what Declan suggested. Separate all the responsibilities out into their own classes. But I'm not 100% sure what the repercussions are in terms of API usability 🤔 I've seen a lot of open source repos actually implementing a "frontend layer." So you do whatever is necessary from an OOP perspective on the backend, but then an additional "frontend" layer is purely responsible for the API. We could adopt a similar pattern

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, perhaps this makes sense. The "frontend" is a separate concern...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A facade pretty much. I'd be happy with this - once I figure out an example of how to do it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could find some inspiration on github

Comment thread dynamicio/handlers/kafka.py Outdated
Comment thread dynamicio/handlers/kafka.py Outdated
Comment thread dynamicio/handlers/kafka.py Outdated
Comment thread dynamicio/handlers/postgres.py Outdated
Comment thread dynamicio/handlers/postgres.py Outdated
Comment thread dynamicio/handlers/postgres.py Outdated
Comment thread dynamicio/handlers/postgres.py
Comment thread dynamicio/handlers/s3/file.py Outdated
Comment thread dynamicio/inject.py
Comment thread dynamicio/io/keyed.py
from pandera import SchemaModel


class IOConfig(Protocol):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very Pythonic usage of protocols instead of ABC metaclasses!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, imo we should be using ABC. XD

@joloppo joloppo merged commit 0c43d7f into v5 Apr 27, 2023
@arturk-vortexa arturk-vortexa mentioned this pull request Aug 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants