From f82dc6854beda763204bd7234eabb9cdd734a48c Mon Sep 17 00:00:00 2001 From: eddyVintus Date: Wed, 27 Jul 2022 09:20:49 +0200 Subject: [PATCH 1/4] Create enexis.py Created Prefect pipeline to download enexis data locally, transform to parquet and upload to GCS --- flows/enexis/enexis.py | 114 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 flows/enexis/enexis.py diff --git a/flows/enexis/enexis.py b/flows/enexis/enexis.py new file mode 100644 index 0000000..8e17ec9 --- /dev/null +++ b/flows/enexis/enexis.py @@ -0,0 +1,114 @@ +import wget +import os +from pyarrow import csv +import pyarrow.parquet as pq +from google.cloud import storage +from google.oauth2 import service_account +# from bag-extract.src.util.gcp import gcs + +import prefect +from prefect import Flow, Parameter, task, unmapped, resource_manager +from prefect.tasks.secrets import PrefectSecret +from pyspark.sql import SparkSession + +enexis_data = [ + "2019" + , "2020" + , "2021" + , "2022" +] + + +@task +def get_csv(years): + csv_output = f"{os.getcwd()}/data/enexis-kleinverbruik-{years}.csv" + csv_url = f"https://s3-eu-west-1.amazonaws.com/enxp433-oda01/kv/Enexis_kleinverbruiksgegevens_0101{years}.csv" + + + if os.path.exists(csv_output): + os.remove(csv_output) + + wget.download(csv_url, csv_output) + + return csv_output + + +# REPLACE with nl-open-data method 'csv_to_parquet()'. +@task +def csv_to_parquet(csv_file, encoding="utf-8", delimiter=","): + encoding + + pq_file = f"{csv_file.split('.')[0]}.parquet" + + table = csv.read_csv( + csv_file, + read_options=csv.ReadOptions(encoding=encoding), + parse_options=csv.ParseOptions(delimiter=delimiter) + ) + + pq.write_table(table, pq_file) + + os.remove(csv_file) + + return pq_file + + +# REPLACE with method from bag-extract.src.util.gcp.gcs. +@task +def get_storage_client(credentials: dict): + credentials = service_account.Credentials.from_service_account_info( + credentials + ) + storage_client = storage.Client(None, credentials) + return storage_client + + +@task +def parquet_to_gcs(file, bucket_name, credential): + file_name = file.split("/")[-1] + blob_name = f"enexis/{file_name}" + + # Local Testing. + # client = storage.Client.from_service_account_json( + # credential + # ) + + # Prefect + client = get_storage_client.run(credential) + + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_name) + + blob.upload_from_filename(file) + + +if __name__ == "__main__": + # for year in enexis_data: + # path_csv = get_csv(year) + # path_pq = csv_to_parquet(path_csv, delimiter=";") + # parquet_to_gcs(path_pq, "temp-prefect-data", "/Users/eddylim/Documents/gcp_keys/prefect_key.json") + + # with Flow("Download Enexis", storage=local()) as flow: + with Flow("Download Enexis") as flow: + logger = prefect.context.get("logger") + + # Secrets + gcp_credentials = PrefectSecret("GCP_CREDENTIALS") + # gcp_credentials = "/Users/eddylim/Documents/gcp_keys/prefect_key.json" + bucket_name = Parameter("bucket_name", default="temp-prefect-data") + + # for year in enexis_data: + # path_csv = get_csv(year) + # path_pq = csv_to_parquet(path_csv, delimiter=";") + # parquet_to_gcs(path_pq, bucket_name, gcp_credentials) + + + path_csv = get_csv.map(enexis_data) + path_pq = csv_to_parquet.map(path_csv, delimiter=unmapped(";")) + parquet_to_gcs.map(path_pq, unmapped(bucket_name), unmapped(gcp_credentials)) + + # Local testing. + # flow.run() + + # Register the Flow. + flow.register("toepol") \ No newline at end of file From b0e45d646b99932c2a2a128dc038edf7fd4e53af Mon Sep 17 00:00:00 2001 From: eddyVintus Date: Tue, 16 Aug 2022 11:38:21 +0200 Subject: [PATCH 2/4] Restructure Flow folder --- flows/enexis/Dockerfile | 15 +++++ flows/enexis/docker-compose.yml | 14 ++++ flows/enexis/enexis.py | 43 ++++++------- flows/enexis/requirements.txt | 8 +++ flows/enexis/setup.py | 11 ++++ flows/enexis/src/enexis/flow.py | 93 +++++++++++++++++++++++++++ flows/enexis/src/util/__init__.py | 0 flows/enexis/src/util/gcp/__init__.py | 0 flows/enexis/src/util/gcp/gcs.py | 39 +++++++++++ 9 files changed, 201 insertions(+), 22 deletions(-) create mode 100644 flows/enexis/Dockerfile create mode 100644 flows/enexis/docker-compose.yml create mode 100644 flows/enexis/requirements.txt create mode 100644 flows/enexis/setup.py create mode 100644 flows/enexis/src/enexis/flow.py create mode 100644 flows/enexis/src/util/__init__.py create mode 100644 flows/enexis/src/util/gcp/__init__.py create mode 100644 flows/enexis/src/util/gcp/gcs.py diff --git a/flows/enexis/Dockerfile b/flows/enexis/Dockerfile new file mode 100644 index 0000000..fc74a88 --- /dev/null +++ b/flows/enexis/Dockerfile @@ -0,0 +1,15 @@ +FROM prefecthq/prefect:latest + +RUN pip install --upgrade pip + +WORKDIR /opt/prefect + +COPY ./src/util/ /opt/prefect/util/ +COPY ./requirements.txt . +COPY ./setup.py . + +RUN pip install . --no-binary pydantic + +COPY ./src/enexis/ /opt/prefect/enexis/ + +RUN pip install prefect diff --git a/flows/enexis/docker-compose.yml b/flows/enexis/docker-compose.yml new file mode 100644 index 0000000..c64bf82 --- /dev/null +++ b/flows/enexis/docker-compose.yml @@ -0,0 +1,14 @@ +version: "3.7" + +services: + bag-extract: + build: + context: . + dockerfile: Dockerfile + entrypoint: ["/bin/sh", "-c"] + image: $image + command: + - | + prefect backend cloud + prefect auth login --key $prefect_api_key + python3 /opt/prefect/enexis/flow.py diff --git a/flows/enexis/enexis.py b/flows/enexis/enexis.py index 8e17ec9..88917c2 100644 --- a/flows/enexis/enexis.py +++ b/flows/enexis/enexis.py @@ -4,12 +4,10 @@ import pyarrow.parquet as pq from google.cloud import storage from google.oauth2 import service_account -# from bag-extract.src.util.gcp import gcs import prefect -from prefect import Flow, Parameter, task, unmapped, resource_manager +from prefect import Flow, Parameter, task, unmapped from prefect.tasks.secrets import PrefectSecret -from pyspark.sql import SparkSession enexis_data = [ "2019" @@ -18,10 +16,16 @@ , "2022" ] +@task +def create_dir(dir): + os.mkdir(dir) + + return dir + @task -def get_csv(years): - csv_output = f"{os.getcwd()}/data/enexis-kleinverbruik-{years}.csv" +def get_csv(years, directory): + csv_output = f"{directory}/enexis-kleinverbruik-{years}.csv" csv_url = f"https://s3-eu-west-1.amazonaws.com/enxp433-oda01/kv/Enexis_kleinverbruiksgegevens_0101{years}.csv" @@ -33,7 +37,6 @@ def get_csv(years): return csv_output -# REPLACE with nl-open-data method 'csv_to_parquet()'. @task def csv_to_parquet(csv_file, encoding="utf-8", delimiter=","): encoding @@ -65,19 +68,19 @@ def get_storage_client(credentials: dict): @task def parquet_to_gcs(file, bucket_name, credential): - file_name = file.split("/")[-1] - blob_name = f"enexis/{file_name}" + # file_name = file.split("/")[-1] + # blob_name = f"enexis/{file_name}" # Local Testing. - # client = storage.Client.from_service_account_json( - # credential - # ) + client = storage.Client.from_service_account_json( + credential + ) # Prefect - client = get_storage_client.run(credential) + # client = get_storage_client.run(credential) bucket = client.bucket(bucket_name) - blob = bucket.blob(blob_name) + blob = bucket.blob(file) blob.upload_from_filename(file) @@ -93,17 +96,13 @@ def parquet_to_gcs(file, bucket_name, credential): logger = prefect.context.get("logger") # Secrets - gcp_credentials = PrefectSecret("GCP_CREDENTIALS") - # gcp_credentials = "/Users/eddylim/Documents/gcp_keys/prefect_key.json" + # gcp_credentials = PrefectSecret("GCP_CREDENTIALS") + gcp_credentials = "/Users/eddylim/Documents/gcp_keys/prefect_key.json" + enexis_dir = Parameter("enexis_dir", default="enexis") bucket_name = Parameter("bucket_name", default="temp-prefect-data") - # for year in enexis_data: - # path_csv = get_csv(year) - # path_pq = csv_to_parquet(path_csv, delimiter=";") - # parquet_to_gcs(path_pq, bucket_name, gcp_credentials) - - - path_csv = get_csv.map(enexis_data) + data_dir = create_dir(enexis_dir) + path_csv = get_csv.map(years=enexis_data, directory=unmapped(data_dir)) path_pq = csv_to_parquet.map(path_csv, delimiter=unmapped(";")) parquet_to_gcs.map(path_pq, unmapped(bucket_name), unmapped(gcp_credentials)) diff --git a/flows/enexis/requirements.txt b/flows/enexis/requirements.txt new file mode 100644 index 0000000..6b50eb5 --- /dev/null +++ b/flows/enexis/requirements.txt @@ -0,0 +1,8 @@ +google-api-core +google-auth +google-cloud-core==2.3.0 +google-cloud-dataproc==4.0.2 +google-cloud-storage==2.4.0 +pyarrow==8.0.0 +wget==3.2 + diff --git a/flows/enexis/setup.py b/flows/enexis/setup.py new file mode 100644 index 0000000..b168a34 --- /dev/null +++ b/flows/enexis/setup.py @@ -0,0 +1,11 @@ +from setuptools import find_packages, setup + +with open("requirements.txt") as f: + requirements = f.read().splitlines() + +setup( + name="enexis", + version="0.1", + packages=find_packages(), + install_requires=requirements, +) diff --git a/flows/enexis/src/enexis/flow.py b/flows/enexis/src/enexis/flow.py new file mode 100644 index 0000000..f4d2394 --- /dev/null +++ b/flows/enexis/src/enexis/flow.py @@ -0,0 +1,93 @@ +import wget +import os +from pyarrow import csv +import pyarrow.parquet as pq +from util.gcp import gcs +from dotenv import load_dotenv + +from prefect import Flow, Parameter, task, unmapped +from prefect.tasks.secrets import PrefectSecret +from prefect.storage.github import GitHub +from prefect.run_configs import DockerRun + +enexis_data = [ + "2019" + , "2020" + , "2021" + , "2022" +] + +@task +def create_dir(dir): + os.mkdir(dir) + return dir + + +@task +def get_csv(years, directory): + csv_output = f"{directory}/enexis-kleinverbruik-{years}.csv" + csv_url = f"https://s3-eu-west-1.amazonaws.com/enxp433-oda01/kv/Enexis_kleinverbruiksgegevens_0101{years}.csv" + + + if os.path.exists(csv_output): + os.remove(csv_output) + + wget.download(csv_url, csv_output) + + return csv_output + + +@task +def csv_to_parquet(csv_file, encoding="utf-8", delimiter=","): + pq_file = f"{csv_file.split('.')[0]}.parquet" + + table = csv.read_csv( + csv_file, + read_options=csv.ReadOptions(encoding=encoding), + parse_options=csv.ParseOptions(delimiter=delimiter) + ) + + pq.write_table(table, pq_file) + + os.remove(csv_file) + + return pq_file + + +@task +def parquet_to_gcs(file, bucket_name, credential): + client = gcs.get_storage_client.run(credential) + + bucket = client.bucket(bucket_name) + blob = bucket.blob(file) + + blob.upload_from_filename(file) + + +with Flow( + "enexis", + storage=GitHub( + repo="dataverbinders/toepol", + path="flows/enexis/src/enexis/flow.py", + ), + run_config=DockerRun( + image=os.getenv("image"), + labels=["enexis"], + env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"}, + ), +) as flow: + # Secrets + gcp_credentials = PrefectSecret("GCP_CREDENTIALS") + + # Parameters + enexis_dir = Parameter("enexis_dir", default="enexis") + bucket_name = Parameter("bucket_name", default="temp-prefect-data") + + data_dir = create_dir(enexis_dir) + path_csv = get_csv.map(years=enexis_data, directory=unmapped(data_dir)) + path_pq = csv_to_parquet.map(path_csv, delimiter=unmapped(";")) + parquet_to_gcs.map(path_pq, unmapped(bucket_name), unmapped(gcp_credentials)) + +prefect_project = "toepol" if os.getenv("production") == "True" else "dev-toepol" + +flow.register("prefect-project") \ No newline at end of file diff --git a/flows/enexis/src/util/__init__.py b/flows/enexis/src/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/enexis/src/util/gcp/__init__.py b/flows/enexis/src/util/gcp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/enexis/src/util/gcp/gcs.py b/flows/enexis/src/util/gcp/gcs.py new file mode 100644 index 0000000..f133da0 --- /dev/null +++ b/flows/enexis/src/util/gcp/gcs.py @@ -0,0 +1,39 @@ +from prefect import task +from google.cloud import storage +from google.oauth2 import service_account + + +@task +def get_storage_client(credentials: dict): + credentials = service_account.Credentials.from_service_account_info( + credentials + ) + storage_client = storage.Client(None, credentials) + return storage_client + + +@task +def get_storage_bucket(client, bucket): + bucket = storage.Bucket(client, bucket) + return bucket + + +@task +def upload_file_to_blob(file, blob_name, bucket): + blob = storage.Blob(blob_name, bucket) + # with open(file, "rt", encoding="ISO-8859-1") as f: + blob.upload_from_filename(file) + return blob_name + + +@task +def upload_files_to_gcs(files, blob_names, credentials, bucket): + client = get_storage_client.run(credentials) + bucket = get_storage_bucket.run(client, bucket) + + blobs = [ + upload_file_to_blob.run(files[i], blob_names[i], bucket) + for i in range(len(files)) + ] + + return blobs From 40065b076bb3e4973500d2c95858d3ef5b30b1e5 Mon Sep 17 00:00:00 2001 From: eddyVintus Date: Tue, 16 Aug 2022 14:07:43 +0200 Subject: [PATCH 3/4] Fix dotenv --- flows/enexis/src/enexis/flow.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flows/enexis/src/enexis/flow.py b/flows/enexis/src/enexis/flow.py index f4d2394..63bf28a 100644 --- a/flows/enexis/src/enexis/flow.py +++ b/flows/enexis/src/enexis/flow.py @@ -17,6 +17,8 @@ , "2022" ] +load_dotenv() + @task def create_dir(dir): os.mkdir(dir) @@ -90,4 +92,4 @@ def parquet_to_gcs(file, bucket_name, credential): prefect_project = "toepol" if os.getenv("production") == "True" else "dev-toepol" -flow.register("prefect-project") \ No newline at end of file +flow.register(prefect_project) \ No newline at end of file From a70168be788642bf8b5059f73deadee66bd08ea7 Mon Sep 17 00:00:00 2001 From: MarcZoon Date: Tue, 16 Aug 2022 14:14:40 +0200 Subject: [PATCH 4/4] Update docker-compose.yml --- flows/enexis/docker-compose.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flows/enexis/docker-compose.yml b/flows/enexis/docker-compose.yml index c64bf82..12407c8 100644 --- a/flows/enexis/docker-compose.yml +++ b/flows/enexis/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.7" services: - bag-extract: + enexis: build: context: . dockerfile: Dockerfile @@ -12,3 +12,5 @@ services: prefect backend cloud prefect auth login --key $prefect_api_key python3 /opt/prefect/enexis/flow.py + env_file: + - .env