Skip to content
This repository was archived by the owner on Jul 15, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions flows/enexis/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM prefecthq/prefect:latest

RUN pip install --upgrade pip

WORKDIR /opt/prefect

COPY ./src/util/ /opt/prefect/util/
COPY ./requirements.txt .
COPY ./setup.py .

RUN pip install . --no-binary pydantic

COPY ./src/enexis/ /opt/prefect/enexis/

RUN pip install prefect
16 changes: 16 additions & 0 deletions flows/enexis/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: "3.7"

services:
enexis:
build:
context: .
dockerfile: Dockerfile
entrypoint: ["/bin/sh", "-c"]
image: $image
command:
- |
prefect backend cloud
prefect auth login --key $prefect_api_key
python3 /opt/prefect/enexis/flow.py
env_file:
- .env
113 changes: 113 additions & 0 deletions flows/enexis/enexis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import wget
import os
from pyarrow import csv
import pyarrow.parquet as pq
from google.cloud import storage
from google.oauth2 import service_account

import prefect
from prefect import Flow, Parameter, task, unmapped
from prefect.tasks.secrets import PrefectSecret

enexis_data = [
"2019"
, "2020"
, "2021"
, "2022"
]

@task
def create_dir(dir):
os.mkdir(dir)

return dir


@task
def get_csv(years, directory):
csv_output = f"{directory}/enexis-kleinverbruik-{years}.csv"
csv_url = f"https://s3-eu-west-1.amazonaws.com/enxp433-oda01/kv/Enexis_kleinverbruiksgegevens_0101{years}.csv"


if os.path.exists(csv_output):
os.remove(csv_output)

wget.download(csv_url, csv_output)

return csv_output


@task
def csv_to_parquet(csv_file, encoding="utf-8", delimiter=","):
encoding

pq_file = f"{csv_file.split('.')[0]}.parquet"

table = csv.read_csv(
csv_file,
read_options=csv.ReadOptions(encoding=encoding),
parse_options=csv.ParseOptions(delimiter=delimiter)
)

pq.write_table(table, pq_file)

os.remove(csv_file)

return pq_file


# REPLACE with method from bag-extract.src.util.gcp.gcs.
@task
def get_storage_client(credentials: dict):
credentials = service_account.Credentials.from_service_account_info(
credentials
)
storage_client = storage.Client(None, credentials)
return storage_client


@task
def parquet_to_gcs(file, bucket_name, credential):
# file_name = file.split("/")[-1]
# blob_name = f"enexis/{file_name}"

# Local Testing.
client = storage.Client.from_service_account_json(
credential
)

# Prefect
# client = get_storage_client.run(credential)

bucket = client.bucket(bucket_name)
blob = bucket.blob(file)

blob.upload_from_filename(file)


if __name__ == "__main__":
# for year in enexis_data:
# path_csv = get_csv(year)
# path_pq = csv_to_parquet(path_csv, delimiter=";")
# parquet_to_gcs(path_pq, "temp-prefect-data", "/Users/eddylim/Documents/gcp_keys/prefect_key.json")

# with Flow("Download Enexis", storage=local()) as flow:
with Flow("Download Enexis") as flow:
logger = prefect.context.get("logger")

# Secrets
# gcp_credentials = PrefectSecret("GCP_CREDENTIALS")
gcp_credentials = "/Users/eddylim/Documents/gcp_keys/prefect_key.json"
enexis_dir = Parameter("enexis_dir", default="enexis")
bucket_name = Parameter("bucket_name", default="temp-prefect-data")

data_dir = create_dir(enexis_dir)
path_csv = get_csv.map(years=enexis_data, directory=unmapped(data_dir))
path_pq = csv_to_parquet.map(path_csv, delimiter=unmapped(";"))
parquet_to_gcs.map(path_pq, unmapped(bucket_name), unmapped(gcp_credentials))

# Local testing.
# flow.run()

# Register the Flow.
flow.register("toepol")
8 changes: 8 additions & 0 deletions flows/enexis/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
google-api-core
google-auth
google-cloud-core==2.3.0
google-cloud-dataproc==4.0.2
google-cloud-storage==2.4.0
pyarrow==8.0.0
wget==3.2

11 changes: 11 additions & 0 deletions flows/enexis/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from setuptools import find_packages, setup

with open("requirements.txt") as f:
requirements = f.read().splitlines()

setup(
name="enexis",
version="0.1",
packages=find_packages(),
install_requires=requirements,
)
95 changes: 95 additions & 0 deletions flows/enexis/src/enexis/flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import wget
import os
from pyarrow import csv
import pyarrow.parquet as pq
from util.gcp import gcs
from dotenv import load_dotenv

from prefect import Flow, Parameter, task, unmapped
from prefect.tasks.secrets import PrefectSecret
from prefect.storage.github import GitHub
from prefect.run_configs import DockerRun

enexis_data = [
"2019"
, "2020"
, "2021"
, "2022"
]

load_dotenv()

@task
def create_dir(dir):
os.mkdir(dir)
return dir


@task
def get_csv(years, directory):
csv_output = f"{directory}/enexis-kleinverbruik-{years}.csv"
csv_url = f"https://s3-eu-west-1.amazonaws.com/enxp433-oda01/kv/Enexis_kleinverbruiksgegevens_0101{years}.csv"


if os.path.exists(csv_output):
os.remove(csv_output)

wget.download(csv_url, csv_output)

return csv_output


@task
def csv_to_parquet(csv_file, encoding="utf-8", delimiter=","):
pq_file = f"{csv_file.split('.')[0]}.parquet"

table = csv.read_csv(
csv_file,
read_options=csv.ReadOptions(encoding=encoding),
parse_options=csv.ParseOptions(delimiter=delimiter)
)

pq.write_table(table, pq_file)

os.remove(csv_file)

return pq_file


@task
def parquet_to_gcs(file, bucket_name, credential):
client = gcs.get_storage_client.run(credential)

bucket = client.bucket(bucket_name)
blob = bucket.blob(file)

blob.upload_from_filename(file)


with Flow(
"enexis",
storage=GitHub(
repo="dataverbinders/toepol",
path="flows/enexis/src/enexis/flow.py",
),
run_config=DockerRun(
image=os.getenv("image"),
labels=["enexis"],
env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"},
),
) as flow:
# Secrets
gcp_credentials = PrefectSecret("GCP_CREDENTIALS")

# Parameters
enexis_dir = Parameter("enexis_dir", default="enexis")
bucket_name = Parameter("bucket_name", default="temp-prefect-data")

data_dir = create_dir(enexis_dir)
path_csv = get_csv.map(years=enexis_data, directory=unmapped(data_dir))
path_pq = csv_to_parquet.map(path_csv, delimiter=unmapped(";"))
parquet_to_gcs.map(path_pq, unmapped(bucket_name), unmapped(gcp_credentials))

prefect_project = "toepol" if os.getenv("production") == "True" else "dev-toepol"

flow.register(prefect_project)
Empty file.
Empty file.
39 changes: 39 additions & 0 deletions flows/enexis/src/util/gcp/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from prefect import task
from google.cloud import storage
from google.oauth2 import service_account


@task
def get_storage_client(credentials: dict):
credentials = service_account.Credentials.from_service_account_info(
credentials
)
storage_client = storage.Client(None, credentials)
return storage_client


@task
def get_storage_bucket(client, bucket):
bucket = storage.Bucket(client, bucket)
return bucket


@task
def upload_file_to_blob(file, blob_name, bucket):
blob = storage.Blob(blob_name, bucket)
# with open(file, "rt", encoding="ISO-8859-1") as f:
blob.upload_from_filename(file)
return blob_name


@task
def upload_files_to_gcs(files, blob_names, credentials, bucket):
client = get_storage_client.run(credentials)
bucket = get_storage_bucket.run(client, bucket)

blobs = [
upload_file_to_blob.run(files[i], blob_names[i], bucket)
for i in range(len(files))
]

return blobs