From 05b9fa8c5d9bc01fbca5983d27c2514b9fa98d0c Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Tue, 28 Apr 2020 22:54:52 +0545 Subject: [PATCH 1/4] dvc: implement params support for pipeline file --- dvc/schema.py | 2 ++ dvc/serialize.py | 34 ++++++++++++++++++++++++++++++---- dvc/stage/loader.py | 30 ++++++++++++++++++++++++++++++ dvc/stage/params.py | 1 + 4 files changed, 63 insertions(+), 4 deletions(-) diff --git a/dvc/schema.py b/dvc/schema.py index 3a1417542e..8d46685382 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -21,6 +21,7 @@ LOCK_FILE_STAGE_SCHEMA = { Required(StageParams.PARAM_CMD): str, StageParams.PARAM_DEPS: [DATA_SCHEMA], + StageParams.PARAM_PARAMS: [{str: object}], StageParams.PARAM_OUTS: [DATA_SCHEMA], } LOCKFILE_SCHEMA = {str: LOCK_FILE_STAGE_SCHEMA} @@ -30,6 +31,7 @@ StageParams.PARAM_CMD: str, Optional(StageParams.PARAM_WDIR): str, Optional(StageParams.PARAM_DEPS): [str], + Optional(StageParams.PARAM_PARAMS): [Any(str, {str: [str]})], Optional(StageParams.PARAM_LOCKED): bool, Optional(StageParams.PARAM_META): object, Optional(StageParams.PARAM_ALWAYS_CHANGED): bool, diff --git a/dvc/serialize.py b/dvc/serialize.py index 2e93729461..759cc7e00f 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -1,5 +1,8 @@ from typing import TYPE_CHECKING +from funcy import rpartial, lsplit_by + +from dvc.dependency import ParamsDependency from dvc.utils.collections import apply_diff from dvc.utils.stage import parse_stage_for_update @@ -21,14 +24,34 @@ def _get_outs(stage: "PipelineStage"): return outs_bucket +def get_params_deps(stage): + params, deps = lsplit_by( + rpartial(isinstance, ParamsDependency), stage.deps + ) + params_keys = [] + params_ = [] + for param in params: + dump = param.dumpd() + path, param_ = dump["path"], dump[stage.PARAM_PARAMS] + param_keys = list(param_.keys()) + if path == ParamsDependency.DEFAULT_PARAMS_FILE: + params_keys += param_keys + else: + params_keys += [{path: param_keys}] if param_keys else [] + params_ += [{path: dump[stage.PARAM_PARAMS]}] + return params_keys, params_, deps + + def to_pipeline_file(stage: "PipelineStage"): + params, _, deps = get_params_deps(stage) return { stage.name: { key: value for key, value in { stage.PARAM_CMD: stage.cmd, stage.PARAM_WDIR: stage.resolve_wdir(), - stage.PARAM_DEPS: [d.def_path for d in stage.deps], + stage.PARAM_DEPS: deps, + stage.PARAM_PARAMS: params, **_get_outs(stage), stage.PARAM_LOCKED: stage.locked, stage.PARAM_ALWAYS_CHANGED: stage.always_changed, @@ -43,17 +66,20 @@ def to_lockfile(stage: "PipelineStage") -> dict: assert stage.name res = {"cmd": stage.cmd} + _, params, deps = get_params_deps(stage) deps = [ {"path": dep.def_path, dep.checksum_type: dep.get_checksum()} - for dep in stage.deps + for dep in deps ] outs = [ {"path": out.def_path, out.checksum_type: out.get_checksum()} for out in stage.outs ] - if stage.deps: + if deps: res["deps"] = deps - if stage.outs: + if params: + res["params"] = params + if outs: res["outs"] = outs return {stage.name: res} diff --git a/dvc/stage/loader.py b/dvc/stage/loader.py index 9636956010..2d55108ab2 100644 --- a/dvc/stage/loader.py +++ b/dvc/stage/loader.py @@ -5,8 +5,11 @@ from copy import deepcopy from itertools import chain +from funcy import merge, first + from dvc import dependency, output from .exceptions import StageNameUnspecified, StageNotFound +from ..dependency import ParamsDependency logger = logging.getLogger(__name__) @@ -54,6 +57,29 @@ def _fill_lock_checksums(stage, lock_data): .get(item.checksum_type) ) + @classmethod + def _fill_params(cls, stage, pipeline_params, lock_params): + res = {} + default_file = ParamsDependency.DEFAULT_PARAMS_FILE + + def get_value(file, k): + return lock_params.get(file, {}).get(k) + + for key in pipeline_params: + if isinstance(key, str): + res.setdefault(default_file, {}).update( + {key: get_value(default_file, key)} + ) + elif isinstance(key, dict): + path = first(key) + res.setdefault(path, {}).update( + {k: get_value(path, k) for k in key[path]} + ) + return dependency.loadd_from( + stage, + [{"path": key, "params": params} for key, params in res.items()], + ) + @classmethod def load_stage(cls, dvcfile, name, stage_data, lock_data): from . import PipelineStage, Stage, loads_from @@ -63,8 +89,12 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data): ) stage = loads_from(PipelineStage, dvcfile.repo, path, wdir, stage_data) stage.name = name + params = stage_data.pop("params", {}) stage._fill_stage_dependencies(**stage_data) stage._fill_stage_outputs(**stage_data) + stage.deps += cls._fill_params( + stage, params, merge(*lock_data.get("params", [{}])) + ) if lock_data: stage.cmd_changed = lock_data.get( Stage.PARAM_CMD diff --git a/dvc/stage/params.py b/dvc/stage/params.py index 6ea8aa65f7..9ae388636d 100644 --- a/dvc/stage/params.py +++ b/dvc/stage/params.py @@ -10,6 +10,7 @@ class StageParams: PARAM_LOCKED = "locked" PARAM_META = "meta" PARAM_ALWAYS_CHANGED = "always_changed" + PARAM_PARAMS = "params" class OutputParams(Enum): From 60a59bce01ea75bbbfaf2bc020ae55ba8cfa276b Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 29 Apr 2020 10:26:16 +0545 Subject: [PATCH 2/4] reorganize dump/load of params --- dvc/schema.py | 2 +- dvc/serialize.py | 63 ++++++++++++++++++++++++------------- dvc/stage/loader.py | 76 +++++++++++++++++++++++++++++++++------------ 3 files changed, 99 insertions(+), 42 deletions(-) diff --git a/dvc/schema.py b/dvc/schema.py index 8d46685382..d6a24d1473 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -21,7 +21,7 @@ LOCK_FILE_STAGE_SCHEMA = { Required(StageParams.PARAM_CMD): str, StageParams.PARAM_DEPS: [DATA_SCHEMA], - StageParams.PARAM_PARAMS: [{str: object}], + StageParams.PARAM_PARAMS: {str: {str: object}}, StageParams.PARAM_OUTS: [DATA_SCHEMA], } LOCKFILE_SCHEMA = {str: LOCK_FILE_STAGE_SCHEMA} diff --git a/dvc/serialize.py b/dvc/serialize.py index 759cc7e00f..29f9834a5d 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -5,10 +5,15 @@ from dvc.dependency import ParamsDependency from dvc.utils.collections import apply_diff from dvc.utils.stage import parse_stage_for_update +from typing import List if TYPE_CHECKING: from dvc.stage import PipelineStage, Stage +PARAM_PATH = ParamsDependency.PARAM_PATH +PARAM_PARAMS = ParamsDependency.PARAM_PARAMS +DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE + def _get_outs(stage: "PipelineStage"): outs_bucket = {} @@ -24,34 +29,50 @@ def _get_outs(stage: "PipelineStage"): return outs_bucket -def get_params_deps(stage): - params, deps = lsplit_by( - rpartial(isinstance, ParamsDependency), stage.deps - ) - params_keys = [] - params_ = [] - for param in params: - dump = param.dumpd() - path, param_ = dump["path"], dump[stage.PARAM_PARAMS] - param_keys = list(param_.keys()) - if path == ParamsDependency.DEFAULT_PARAMS_FILE: - params_keys += param_keys - else: - params_keys += [{path: param_keys}] if param_keys else [] - params_ += [{path: dump[stage.PARAM_PARAMS]}] - return params_keys, params_, deps +def get_params_deps(stage: "PipelineStage"): + return lsplit_by(rpartial(isinstance, ParamsDependency), stage.deps) + + +def _serialize_params(params: List[ParamsDependency]): + """Return two types of values from stage: + + `keys` - which is list of params without values, used in a pipeline file + + which is in the shape of: + ['lr', 'train', {'params2.yaml': ['lr']}] + `key_vals` - which is list of params with values, used in a lockfile + which is in the shape of: + {'params.yaml': {'lr': '1', 'train': 2}, {'params2.yaml': {'lr': '1'}} + """ + keys = [] + key_vals = {} + + for param_dep in params: + dump = param_dep.dumpd() + path, params = dump[PARAM_PATH], dump[PARAM_PARAMS] + k = list(params.keys()) + if not k: + continue + # if it's not a default file, change the shape + # to: {path: k} + keys.extend(k if path == DEFAULT_PARAMS_FILE else [{path: k}]) + key_vals.update({path: params}) + + return keys, key_vals def to_pipeline_file(stage: "PipelineStage"): - params, _, deps = get_params_deps(stage) + params, deps = get_params_deps(stage) + serialized_params, _ = _serialize_params(params) + return { stage.name: { key: value for key, value in { stage.PARAM_CMD: stage.cmd, stage.PARAM_WDIR: stage.resolve_wdir(), - stage.PARAM_DEPS: deps, - stage.PARAM_PARAMS: params, + stage.PARAM_DEPS: [d.def_path for d in deps], + stage.PARAM_PARAMS: serialized_params, **_get_outs(stage), stage.PARAM_LOCKED: stage.locked, stage.PARAM_ALWAYS_CHANGED: stage.always_changed, @@ -66,7 +87,7 @@ def to_lockfile(stage: "PipelineStage") -> dict: assert stage.name res = {"cmd": stage.cmd} - _, params, deps = get_params_deps(stage) + params, deps = get_params_deps(stage) deps = [ {"path": dep.def_path, dep.checksum_type: dep.get_checksum()} for dep in deps @@ -78,7 +99,7 @@ def to_lockfile(stage: "PipelineStage") -> dict: if deps: res["deps"] = deps if params: - res["params"] = params + _, res["params"] = _serialize_params(params) if outs: res["outs"] = outs diff --git a/dvc/stage/loader.py b/dvc/stage/loader.py index 2d55108ab2..aefe42789b 100644 --- a/dvc/stage/loader.py +++ b/dvc/stage/loader.py @@ -1,11 +1,11 @@ -import collections import logging import os from copy import deepcopy +from collections import defaultdict, Mapping from itertools import chain -from funcy import merge, first +from funcy import first from dvc import dependency, output from .exceptions import StageNameUnspecified, StageNotFound @@ -14,6 +14,9 @@ logger = logging.getLogger(__name__) +DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE + + def resolve_paths(path, wdir=None): path = os.path.abspath(path) wdir = wdir or os.curdir @@ -21,7 +24,7 @@ def resolve_paths(path, wdir=None): return path, wdir -class StageLoader(collections.abc.Mapping): +class StageLoader(Mapping): def __init__(self, dvcfile, stages_data, lockfile_data=None): self.dvcfile = dvcfile self.stages_data = stages_data or {} @@ -58,24 +61,59 @@ def _fill_lock_checksums(stage, lock_data): ) @classmethod - def _fill_params(cls, stage, pipeline_params, lock_params): - res = {} - default_file = ParamsDependency.DEFAULT_PARAMS_FILE - - def get_value(file, k): - return lock_params.get(file, {}).get(k) + def _load_params(cls, stage, pipeline_params, lock_params=None): + """ + File in pipeline file is expected to be in following format: + ``` + params: + - lr + - train.epochs + - params2.yaml: # notice the filename + - process.threshold + - process.bow + ``` + + and, in lockfile, we keep it as following format: + ``` + params: + params.yaml: + lr: 0.0041 + train.epochs: 100 + params2.yaml: + process.threshold: 0.98 + process.bow: + - 15000 + - 123 + ``` + + So, here, we merge these two formats into one (ignoring one's only + specified on lockfile but missing on pipeline file), and load the + `ParamsDependency` for the given stage. + + In the list of `params` inside pipeline file, if any of the item is + dict-like, the key will be treated as separate params file and it's + values to be part of that params file, else, the item is considered + as part of the `params.yaml` which is a default file. + + (From example above: `lr` is considered to be part of `params.yaml` + whereas `process.bow` to be part of `params2.yaml`.) + """ + res = defaultdict(lambda: defaultdict(dict)) + lock_params = lock_params or {} + + def get_value(file, param): + return lock_params.get(file, {}).get(param) for key in pipeline_params: if isinstance(key, str): - res.setdefault(default_file, {}).update( - {key: get_value(default_file, key)} - ) + path = DEFAULT_PARAMS_FILE + res[path][key] = get_value(path, key) elif isinstance(key, dict): path = first(key) - res.setdefault(path, {}).update( - {k: get_value(path, k) for k in key[path]} - ) - return dependency.loadd_from( + for k in key[path]: + res[path][k] = get_value(path, k) + + stage.deps += dependency.loadd_from( stage, [{"path": key, "params": params} for key, params in res.items()], ) @@ -92,9 +130,7 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data): params = stage_data.pop("params", {}) stage._fill_stage_dependencies(**stage_data) stage._fill_stage_outputs(**stage_data) - stage.deps += cls._fill_params( - stage, params, merge(*lock_data.get("params", [{}])) - ) + cls._load_params(stage, params, lock_data.get("params")) if lock_data: stage.cmd_changed = lock_data.get( Stage.PARAM_CMD @@ -132,7 +168,7 @@ def __contains__(self, name): return name in self.stages_data -class SingleStageLoader(collections.abc.Mapping): +class SingleStageLoader(Mapping): def __init__(self, dvcfile, stage_data, stage_text=None, tag=None): self.dvcfile = dvcfile self.stage_data = stage_data or {} From ba06d636514b1d8861dccf88883515a5890b665c Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 29 Apr 2020 13:16:35 +0545 Subject: [PATCH 3/4] add tests for params --- dvc/serialize.py | 4 +- tests/func/params/test_diff.py | 17 ++++++++ tests/func/params/test_show.py | 16 ++++++++ tests/func/test_repro_multistage.py | 60 ++++++++++++++++++++++++++++ tests/func/test_run_multistage.py | 61 +++++++++++++++++++++++++++++ 5 files changed, 156 insertions(+), 2 deletions(-) diff --git a/dvc/serialize.py b/dvc/serialize.py index 29f9834a5d..bc0bd8d3c6 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING -from funcy import rpartial, lsplit_by +from funcy import rpartial, lsplit from dvc.dependency import ParamsDependency from dvc.utils.collections import apply_diff @@ -30,7 +30,7 @@ def _get_outs(stage: "PipelineStage"): def get_params_deps(stage: "PipelineStage"): - return lsplit_by(rpartial(isinstance, ParamsDependency), stage.deps) + return lsplit(rpartial(isinstance, ParamsDependency), stage.deps) def _serialize_params(params: List[ParamsDependency]): diff --git a/tests/func/params/test_diff.py b/tests/func/params/test_diff.py index 0f8dc31a86..3dab45066d 100644 --- a/tests/func/params/test_diff.py +++ b/tests/func/params/test_diff.py @@ -102,3 +102,20 @@ def test_diff_with_unchanged(tmp_dir, scm, dvc): "xyz": {"old": "val", "new": "val"}, } } + + +def test_pipeline_tracked_params(tmp_dir, scm, dvc, run_copy): + from dvc.dvcfile import PIPELINE_FILE + + tmp_dir.gen({"foo": "foo", "params.yaml": "foo: bar\nxyz: val"}) + run_copy("foo", "bar", name="copy-foo-bar", params=["foo,xyz"]) + + scm.add(["params.yaml", PIPELINE_FILE]) + scm.commit("add stage") + + tmp_dir.scm_gen("params.yaml", "foo: baz\nxyz: val", commit="baz") + tmp_dir.scm_gen("params.yaml", "foo: qux\nxyz: val", commit="qux") + + assert dvc.params.diff(a_rev="HEAD~2") == { + "params.yaml": {"foo": {"old": "bar", "new": "qux"}} + } diff --git a/tests/func/params/test_show.py b/tests/func/params/test_show.py index b52d9e9a12..e1f72cb3c4 100644 --- a/tests/func/params/test_show.py +++ b/tests/func/params/test_show.py @@ -42,3 +42,19 @@ def test_show_branch(tmp_dir, scm, dvc): "working tree": {"params.yaml": {"foo": "bar"}}, "branch": {"params.yaml": {"foo": "baz"}}, } + + +def test_pipeline_tracked_params(tmp_dir, scm, dvc, run_copy): + from dvc.dvcfile import PIPELINE_FILE + + tmp_dir.gen({"foo": "foo", "params.yaml": "foo: bar\nxyz: val"}) + run_copy("foo", "bar", name="copy-foo-bar", params=["foo,xyz"]) + scm.add(["params.yaml", PIPELINE_FILE]) + scm.commit("add stage") + + tmp_dir.scm_gen("params.yaml", "foo: baz\nxyz: val", commit="baz") + tmp_dir.scm_gen("params.yaml", "foo: qux\nxyz: val", commit="qux") + + assert dvc.params.show(revs=["master"]) == { + "master": {"params.yaml": {"foo": "qux", "xyz": "val"}} + } diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index 529f1f6589..6cfd62645c 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -1,7 +1,9 @@ import os +from copy import deepcopy from textwrap import dedent import pytest +import yaml from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK from dvc.exceptions import CyclicGraphError @@ -485,3 +487,61 @@ def test_cyclic_graph_error(tmp_dir, dvc, run_copy): dump_stage_file(PIPELINE_FILE, data) with pytest.raises(CyclicGraphError): dvc.reproduce(":copy-baz-foo") + + +def test_repro_multiple_params(tmp_dir, dvc): + from tests.func.test_run_multistage import supported_params + + from dvc.serialize import get_params_deps + + with (tmp_dir / "params2.yaml").open("w+") as f: + yaml.dump(supported_params, f) + + with (tmp_dir / "params.yaml").open("w+") as f: + yaml.dump(supported_params, f) + + (tmp_dir / "foo").write_text("foo") + stage = dvc.run( + name="read_params", + deps=["foo"], + outs=["bar"], + params=[ + "params2.yaml:lists,floats,name", + "answer,floats,nested.nested1", + ], + cmd="cat params2.yaml params.yaml > bar", + ) + + params, deps = get_params_deps(stage) + assert len(params) == 2 + assert len(deps) == 1 + assert len(stage.outs) == 1 + + lockfile = stage.dvcfile._lockfile + assert lockfile.load()["read_params"]["params"] == { + "params2.yaml": { + "lists": [42, 42.0, "42"], + "floats": 42.0, + "name": "Answer", + }, + "params.yaml": { + "answer": 42, + "floats": 42.0, + "nested.nested1": {"nested2": "42", "nested2-2": 41.99999}, + }, + } + data, _ = stage.dvcfile._load() + assert data["stages"]["read_params"]["params"] == [ + {"params2.yaml": ["lists", "floats", "name"]}, + "answer", + "floats", + "nested.nested1", + ] + + assert not dvc.reproduce(stage.addressing) + with (tmp_dir / "params.yaml").open("w+") as f: + params = deepcopy(supported_params) + params["answer"] = 43 + yaml.dump(params, f) + + assert dvc.reproduce(stage.addressing) == [stage] diff --git a/tests/func/test_run_multistage.py b/tests/func/test_run_multistage.py index f258622738..a156ad55ff 100644 --- a/tests/func/test_run_multistage.py +++ b/tests/func/test_run_multistage.py @@ -1,6 +1,8 @@ import pytest import os +import yaml + from dvc.stage.exceptions import InvalidStageName, DuplicateStageName @@ -169,3 +171,62 @@ def test_run_already_exists(tmp_dir, dvc, run_copy): run_copy("foo", "bar", name="copy") with pytest.raises(DuplicateStageName): run_copy("bar", "foobar", name="copy") + + +supported_params = { + "name": "Answer", + "answer": 42, + "floats": 42.0, + "lists": [42, 42.0, "42"], + "nested": {"nested1": {"nested2": "42", "nested2-2": 41.99999}}, +} + + +def test_run_params_default(tmp_dir, dvc): + from dvc.dependency import ParamsDependency + + with (tmp_dir / "params.yaml").open("w+") as f: + yaml.dump(supported_params, f) + + stage = dvc.run( + name="read_params", + params=["nested.nested1.nested2"], + cmd="cat params.yaml", + ) + isinstance(stage.deps[0], ParamsDependency) + assert stage.deps[0].params == ["nested.nested1.nested2"] + + lockfile = stage.dvcfile._lockfile + assert lockfile.load()["read_params"]["params"] == { + "params.yaml": {"nested.nested1.nested2": "42"} + } + + data, _ = stage.dvcfile._load() + assert data["stages"]["read_params"]["params"] == [ + "nested.nested1.nested2" + ] + + +def test_run_params_custom_file(tmp_dir, dvc): + from dvc.dependency import ParamsDependency + + with (tmp_dir / "params2.yaml").open("w+") as f: + yaml.dump(supported_params, f) + + stage = dvc.run( + name="read_params", + params=["params2.yaml:lists"], + cmd="cat params2.yaml", + ) + + isinstance(stage.deps[0], ParamsDependency) + assert stage.deps[0].params == ["lists"] + lockfile = stage.dvcfile._lockfile + assert lockfile.load()["read_params"]["params"] == { + "params2.yaml": {"lists": [42, 42.0, "42"]} + } + + data, _ = stage.dvcfile._load() + assert data["stages"]["read_params"]["params"] == [ + {"params2.yaml": ["lists"]} + ] From 0583278d559d85f1951cb0c24e9ee83211a3ce52 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 29 Apr 2020 13:58:14 +0545 Subject: [PATCH 4/4] fix order of dictionary in Python 3.5 --- tests/func/test_repro_multistage.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index 6cfd62645c..10b6ca4135 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -4,6 +4,7 @@ import pytest import yaml +from funcy import lsplit from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK from dvc.exceptions import CyclicGraphError @@ -531,12 +532,11 @@ def test_repro_multiple_params(tmp_dir, dvc): }, } data, _ = stage.dvcfile._load() - assert data["stages"]["read_params"]["params"] == [ - {"params2.yaml": ["lists", "floats", "name"]}, - "answer", - "floats", - "nested.nested1", - ] + params = data["stages"]["read_params"]["params"] + + custom, defaults = lsplit(lambda v: isinstance(v, dict), params) + assert set(custom[0]["params2.yaml"]) == {"name", "lists", "floats"} + assert set(defaults) == {"answer", "floats", "nested.nested1"} assert not dvc.reproduce(stage.addressing) with (tmp_dir / "params.yaml").open("w+") as f: