Skip to content
This repository was archived by the owner on Jul 15, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions nl_open_data/flows/register/register_statline_bq_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,17 @@
# Providing it as default in the `register` stage is a temporary hotfix (though providing a default might be considered anyway)
config = Parameter("config", default=CONFIG)
gcp_env = Parameter("gcp_env", default="dev")
endpoint = Parameter("endpoint", default="bq")
# endpoint = Parameter("endpoint", default="bq")
local_dir = Parameter(
"local_dir", default=None
) # TODO: how to manage in a mapped context? is it even needed here?
force = Parameter("force", default=False)
credentials = Parameter("credentials", default=None)

config_box = nlt.dict_to_box(config, frozen_box=True)
gcp_env = nlt.lower(gcp_env)
odata_versions = check_v4.map(ids)
gcp = set_gcp(config, gcp_env, source)
gcp = set_gcp(config_box, gcp_env, source)
skips = skip_dataset.map(
id=ids,
source=unmapped(source),
Expand All @@ -109,7 +110,7 @@
id=ids,
source=unmapped(source),
third_party=unmapped(third_party),
config=unmapped(config),
config=unmapped(config_box),
gcp_env=unmapped(gcp_env),
force=unmapped(force),
credentials=unmapped(credentials),
Expand All @@ -120,7 +121,7 @@
id=ids,
odata_version=odata_versions,
source=unmapped(source),
config=unmapped(config),
config=unmapped(config_box),
upstream_tasks=[pq_files],
# BUG: Why is this needed? setting "remove_dir.trigger = all_finished" should have been sufficient, but it isn't
# This dependency is set to ensure that remove_dir (which has a data dependency on this task) is run after main
Expand Down
1 change: 1 addition & 0 deletions nl_open_data/flows/run/cbs/run_statline_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"third_party": THIRD_PARTY,
"gcp_env": GCP_ENV,
"force": FORCE,
"config": CONFIG
}

flow_run_id = prefect_client.create_flow_run(
Expand Down
2 changes: 2 additions & 0 deletions nl_open_data/flows/run/cbs/run_statline_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
THIRD_PARTY = True
GCP_ENV = "prod"
FORCE = False
CONFIG = config

for i in range(len(DATA) // 10 + 1):
# run parameters
Expand All @@ -41,6 +42,7 @@
"third_party": THIRD_PARTY,
"gcp_env": GCP_ENV,
"force": FORCE,
"config": CONFIG
}

flow_run_id = prefect_client.create_flow_run(
Expand Down
6 changes: 3 additions & 3 deletions nl_open_data/flows/run/cbs/run_test_statline_bq_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
TENANT_SLUG = "dataverbinders"

# flow parameters
DATA = ["83583NED"]
# DATA = ["83583NED", "83765NED"]
# DATA = ["83583NED"]
DATA = ["83583NED", "83765NED"]
# DATA = ["84750NED"]
SOURCE = "cbs"
THIRD_PARTY = False
Expand All @@ -38,7 +38,7 @@
"third_party": THIRD_PARTY,
"gcp_env": GCP_ENV,
"force": FORCE,
# "config": CONFIG,
"config": CONFIG,
}

if __name__ == "__main__":
Expand Down
7 changes: 6 additions & 1 deletion nl_open_data/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union, Mapping, Sequence
from typing import Union, Mapping, Sequence, Dict
from pathlib import Path
import os
from shutil import rmtree
Expand All @@ -19,6 +19,11 @@
import nl_open_data.utils as nlu


@task
def dict_to_box(dict: Dict, frozen_box: bool = False):
return Box(dict, frozen_box=frozen_box)


@task
def same_path(path):
return path
Expand Down
Loading