diff --git a/flows/enexis/Dockerfile b/flows/enexis/Dockerfile new file mode 100644 index 0000000..fc74a88 --- /dev/null +++ b/flows/enexis/Dockerfile @@ -0,0 +1,15 @@ +FROM prefecthq/prefect:latest + +RUN pip install --upgrade pip + +WORKDIR /opt/prefect + +COPY ./src/util/ /opt/prefect/util/ +COPY ./requirements.txt . +COPY ./setup.py . + +RUN pip install . --no-binary pydantic + +COPY ./src/enexis/ /opt/prefect/enexis/ + +RUN pip install prefect diff --git a/flows/enexis/docker-compose.yml b/flows/enexis/docker-compose.yml new file mode 100644 index 0000000..12407c8 --- /dev/null +++ b/flows/enexis/docker-compose.yml @@ -0,0 +1,16 @@ +version: "3.7" + +services: + enexis: + build: + context: . + dockerfile: Dockerfile + entrypoint: ["/bin/sh", "-c"] + image: $image + command: + - | + prefect backend cloud + prefect auth login --key $prefect_api_key + python3 /opt/prefect/enexis/flow.py + env_file: + - .env diff --git a/flows/enexis/enexis.py b/flows/enexis/enexis.py new file mode 100644 index 0000000..88917c2 --- /dev/null +++ b/flows/enexis/enexis.py @@ -0,0 +1,113 @@ +import wget +import os +from pyarrow import csv +import pyarrow.parquet as pq +from google.cloud import storage +from google.oauth2 import service_account + +import prefect +from prefect import Flow, Parameter, task, unmapped +from prefect.tasks.secrets import PrefectSecret + +enexis_data = [ + "2019" + , "2020" + , "2021" + , "2022" +] + +@task +def create_dir(dir): + os.mkdir(dir) + + return dir + + +@task +def get_csv(years, directory): + csv_output = f"{directory}/enexis-kleinverbruik-{years}.csv" + csv_url = f"https://s3-eu-west-1.amazonaws.com/enxp433-oda01/kv/Enexis_kleinverbruiksgegevens_0101{years}.csv" + + + if os.path.exists(csv_output): + os.remove(csv_output) + + wget.download(csv_url, csv_output) + + return csv_output + + +@task +def csv_to_parquet(csv_file, encoding="utf-8", delimiter=","): + encoding + + pq_file = f"{csv_file.split('.')[0]}.parquet" + + table = csv.read_csv( + csv_file, + read_options=csv.ReadOptions(encoding=encoding), + parse_options=csv.ParseOptions(delimiter=delimiter) + ) + + pq.write_table(table, pq_file) + + os.remove(csv_file) + + return pq_file + + +# REPLACE with method from bag-extract.src.util.gcp.gcs. +@task +def get_storage_client(credentials: dict): + credentials = service_account.Credentials.from_service_account_info( + credentials + ) + storage_client = storage.Client(None, credentials) + return storage_client + + +@task +def parquet_to_gcs(file, bucket_name, credential): + # file_name = file.split("/")[-1] + # blob_name = f"enexis/{file_name}" + + # Local Testing. + client = storage.Client.from_service_account_json( + credential + ) + + # Prefect + # client = get_storage_client.run(credential) + + bucket = client.bucket(bucket_name) + blob = bucket.blob(file) + + blob.upload_from_filename(file) + + +if __name__ == "__main__": + # for year in enexis_data: + # path_csv = get_csv(year) + # path_pq = csv_to_parquet(path_csv, delimiter=";") + # parquet_to_gcs(path_pq, "temp-prefect-data", "/Users/eddylim/Documents/gcp_keys/prefect_key.json") + + # with Flow("Download Enexis", storage=local()) as flow: + with Flow("Download Enexis") as flow: + logger = prefect.context.get("logger") + + # Secrets + # gcp_credentials = PrefectSecret("GCP_CREDENTIALS") + gcp_credentials = "/Users/eddylim/Documents/gcp_keys/prefect_key.json" + enexis_dir = Parameter("enexis_dir", default="enexis") + bucket_name = Parameter("bucket_name", default="temp-prefect-data") + + data_dir = create_dir(enexis_dir) + path_csv = get_csv.map(years=enexis_data, directory=unmapped(data_dir)) + path_pq = csv_to_parquet.map(path_csv, delimiter=unmapped(";")) + parquet_to_gcs.map(path_pq, unmapped(bucket_name), unmapped(gcp_credentials)) + + # Local testing. + # flow.run() + + # Register the Flow. + flow.register("toepol") \ No newline at end of file diff --git a/flows/enexis/requirements.txt b/flows/enexis/requirements.txt new file mode 100644 index 0000000..6b50eb5 --- /dev/null +++ b/flows/enexis/requirements.txt @@ -0,0 +1,8 @@ +google-api-core +google-auth +google-cloud-core==2.3.0 +google-cloud-dataproc==4.0.2 +google-cloud-storage==2.4.0 +pyarrow==8.0.0 +wget==3.2 + diff --git a/flows/enexis/setup.py b/flows/enexis/setup.py new file mode 100644 index 0000000..b168a34 --- /dev/null +++ b/flows/enexis/setup.py @@ -0,0 +1,11 @@ +from setuptools import find_packages, setup + +with open("requirements.txt") as f: + requirements = f.read().splitlines() + +setup( + name="enexis", + version="0.1", + packages=find_packages(), + install_requires=requirements, +) diff --git a/flows/enexis/src/enexis/flow.py b/flows/enexis/src/enexis/flow.py new file mode 100644 index 0000000..63bf28a --- /dev/null +++ b/flows/enexis/src/enexis/flow.py @@ -0,0 +1,95 @@ +import wget +import os +from pyarrow import csv +import pyarrow.parquet as pq +from util.gcp import gcs +from dotenv import load_dotenv + +from prefect import Flow, Parameter, task, unmapped +from prefect.tasks.secrets import PrefectSecret +from prefect.storage.github import GitHub +from prefect.run_configs import DockerRun + +enexis_data = [ + "2019" + , "2020" + , "2021" + , "2022" +] + +load_dotenv() + +@task +def create_dir(dir): + os.mkdir(dir) + return dir + + +@task +def get_csv(years, directory): + csv_output = f"{directory}/enexis-kleinverbruik-{years}.csv" + csv_url = f"https://s3-eu-west-1.amazonaws.com/enxp433-oda01/kv/Enexis_kleinverbruiksgegevens_0101{years}.csv" + + + if os.path.exists(csv_output): + os.remove(csv_output) + + wget.download(csv_url, csv_output) + + return csv_output + + +@task +def csv_to_parquet(csv_file, encoding="utf-8", delimiter=","): + pq_file = f"{csv_file.split('.')[0]}.parquet" + + table = csv.read_csv( + csv_file, + read_options=csv.ReadOptions(encoding=encoding), + parse_options=csv.ParseOptions(delimiter=delimiter) + ) + + pq.write_table(table, pq_file) + + os.remove(csv_file) + + return pq_file + + +@task +def parquet_to_gcs(file, bucket_name, credential): + client = gcs.get_storage_client.run(credential) + + bucket = client.bucket(bucket_name) + blob = bucket.blob(file) + + blob.upload_from_filename(file) + + +with Flow( + "enexis", + storage=GitHub( + repo="dataverbinders/toepol", + path="flows/enexis/src/enexis/flow.py", + ), + run_config=DockerRun( + image=os.getenv("image"), + labels=["enexis"], + env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"}, + ), +) as flow: + # Secrets + gcp_credentials = PrefectSecret("GCP_CREDENTIALS") + + # Parameters + enexis_dir = Parameter("enexis_dir", default="enexis") + bucket_name = Parameter("bucket_name", default="temp-prefect-data") + + data_dir = create_dir(enexis_dir) + path_csv = get_csv.map(years=enexis_data, directory=unmapped(data_dir)) + path_pq = csv_to_parquet.map(path_csv, delimiter=unmapped(";")) + parquet_to_gcs.map(path_pq, unmapped(bucket_name), unmapped(gcp_credentials)) + +prefect_project = "toepol" if os.getenv("production") == "True" else "dev-toepol" + +flow.register(prefect_project) \ No newline at end of file diff --git a/flows/enexis/src/util/__init__.py b/flows/enexis/src/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/enexis/src/util/gcp/__init__.py b/flows/enexis/src/util/gcp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/enexis/src/util/gcp/gcs.py b/flows/enexis/src/util/gcp/gcs.py new file mode 100644 index 0000000..f133da0 --- /dev/null +++ b/flows/enexis/src/util/gcp/gcs.py @@ -0,0 +1,39 @@ +from prefect import task +from google.cloud import storage +from google.oauth2 import service_account + + +@task +def get_storage_client(credentials: dict): + credentials = service_account.Credentials.from_service_account_info( + credentials + ) + storage_client = storage.Client(None, credentials) + return storage_client + + +@task +def get_storage_bucket(client, bucket): + bucket = storage.Bucket(client, bucket) + return bucket + + +@task +def upload_file_to_blob(file, blob_name, bucket): + blob = storage.Blob(blob_name, bucket) + # with open(file, "rt", encoding="ISO-8859-1") as f: + blob.upload_from_filename(file) + return blob_name + + +@task +def upload_files_to_gcs(files, blob_names, credentials, bucket): + client = get_storage_client.run(credentials) + bucket = get_storage_bucket.run(client, bucket) + + blobs = [ + upload_file_to_blob.run(files[i], blob_names[i], bucket) + for i in range(len(files)) + ] + + return blobs