diff --git a/dvc/schema.py b/dvc/schema.py index 3a1417542e..d6a24d1473 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -21,6 +21,7 @@ LOCK_FILE_STAGE_SCHEMA = { Required(StageParams.PARAM_CMD): str, StageParams.PARAM_DEPS: [DATA_SCHEMA], + StageParams.PARAM_PARAMS: {str: {str: object}}, StageParams.PARAM_OUTS: [DATA_SCHEMA], } LOCKFILE_SCHEMA = {str: LOCK_FILE_STAGE_SCHEMA} @@ -30,6 +31,7 @@ StageParams.PARAM_CMD: str, Optional(StageParams.PARAM_WDIR): str, Optional(StageParams.PARAM_DEPS): [str], + Optional(StageParams.PARAM_PARAMS): [Any(str, {str: [str]})], Optional(StageParams.PARAM_LOCKED): bool, Optional(StageParams.PARAM_META): object, Optional(StageParams.PARAM_ALWAYS_CHANGED): bool, diff --git a/dvc/serialize.py b/dvc/serialize.py index 2e93729461..bc0bd8d3c6 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -1,11 +1,19 @@ from typing import TYPE_CHECKING +from funcy import rpartial, lsplit + +from dvc.dependency import ParamsDependency from dvc.utils.collections import apply_diff from dvc.utils.stage import parse_stage_for_update +from typing import List if TYPE_CHECKING: from dvc.stage import PipelineStage, Stage +PARAM_PATH = ParamsDependency.PARAM_PATH +PARAM_PARAMS = ParamsDependency.PARAM_PARAMS +DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE + def _get_outs(stage: "PipelineStage"): outs_bucket = {} @@ -21,14 +29,50 @@ def _get_outs(stage: "PipelineStage"): return outs_bucket +def get_params_deps(stage: "PipelineStage"): + return lsplit(rpartial(isinstance, ParamsDependency), stage.deps) + + +def _serialize_params(params: List[ParamsDependency]): + """Return two types of values from stage: + + `keys` - which is list of params without values, used in a pipeline file + + which is in the shape of: + ['lr', 'train', {'params2.yaml': ['lr']}] + `key_vals` - which is list of params with values, used in a lockfile + which is in the shape of: + {'params.yaml': {'lr': '1', 'train': 2}, {'params2.yaml': {'lr': '1'}} + """ + keys = [] + key_vals = {} + + for param_dep in params: + dump = param_dep.dumpd() + path, params = dump[PARAM_PATH], dump[PARAM_PARAMS] + k = list(params.keys()) + if not k: + continue + # if it's not a default file, change the shape + # to: {path: k} + keys.extend(k if path == DEFAULT_PARAMS_FILE else [{path: k}]) + key_vals.update({path: params}) + + return keys, key_vals + + def to_pipeline_file(stage: "PipelineStage"): + params, deps = get_params_deps(stage) + serialized_params, _ = _serialize_params(params) + return { stage.name: { key: value for key, value in { stage.PARAM_CMD: stage.cmd, stage.PARAM_WDIR: stage.resolve_wdir(), - stage.PARAM_DEPS: [d.def_path for d in stage.deps], + stage.PARAM_DEPS: [d.def_path for d in deps], + stage.PARAM_PARAMS: serialized_params, **_get_outs(stage), stage.PARAM_LOCKED: stage.locked, stage.PARAM_ALWAYS_CHANGED: stage.always_changed, @@ -43,17 +87,20 @@ def to_lockfile(stage: "PipelineStage") -> dict: assert stage.name res = {"cmd": stage.cmd} + params, deps = get_params_deps(stage) deps = [ {"path": dep.def_path, dep.checksum_type: dep.get_checksum()} - for dep in stage.deps + for dep in deps ] outs = [ {"path": out.def_path, out.checksum_type: out.get_checksum()} for out in stage.outs ] - if stage.deps: + if deps: res["deps"] = deps - if stage.outs: + if params: + _, res["params"] = _serialize_params(params) + if outs: res["outs"] = outs return {stage.name: res} diff --git a/dvc/stage/loader.py b/dvc/stage/loader.py index 9636956010..aefe42789b 100644 --- a/dvc/stage/loader.py +++ b/dvc/stage/loader.py @@ -1,16 +1,22 @@ -import collections import logging import os from copy import deepcopy +from collections import defaultdict, Mapping from itertools import chain +from funcy import first + from dvc import dependency, output from .exceptions import StageNameUnspecified, StageNotFound +from ..dependency import ParamsDependency logger = logging.getLogger(__name__) +DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE + + def resolve_paths(path, wdir=None): path = os.path.abspath(path) wdir = wdir or os.curdir @@ -18,7 +24,7 @@ def resolve_paths(path, wdir=None): return path, wdir -class StageLoader(collections.abc.Mapping): +class StageLoader(Mapping): def __init__(self, dvcfile, stages_data, lockfile_data=None): self.dvcfile = dvcfile self.stages_data = stages_data or {} @@ -54,6 +60,64 @@ def _fill_lock_checksums(stage, lock_data): .get(item.checksum_type) ) + @classmethod + def _load_params(cls, stage, pipeline_params, lock_params=None): + """ + File in pipeline file is expected to be in following format: + ``` + params: + - lr + - train.epochs + - params2.yaml: # notice the filename + - process.threshold + - process.bow + ``` + + and, in lockfile, we keep it as following format: + ``` + params: + params.yaml: + lr: 0.0041 + train.epochs: 100 + params2.yaml: + process.threshold: 0.98 + process.bow: + - 15000 + - 123 + ``` + + So, here, we merge these two formats into one (ignoring one's only + specified on lockfile but missing on pipeline file), and load the + `ParamsDependency` for the given stage. + + In the list of `params` inside pipeline file, if any of the item is + dict-like, the key will be treated as separate params file and it's + values to be part of that params file, else, the item is considered + as part of the `params.yaml` which is a default file. + + (From example above: `lr` is considered to be part of `params.yaml` + whereas `process.bow` to be part of `params2.yaml`.) + """ + res = defaultdict(lambda: defaultdict(dict)) + lock_params = lock_params or {} + + def get_value(file, param): + return lock_params.get(file, {}).get(param) + + for key in pipeline_params: + if isinstance(key, str): + path = DEFAULT_PARAMS_FILE + res[path][key] = get_value(path, key) + elif isinstance(key, dict): + path = first(key) + for k in key[path]: + res[path][k] = get_value(path, k) + + stage.deps += dependency.loadd_from( + stage, + [{"path": key, "params": params} for key, params in res.items()], + ) + @classmethod def load_stage(cls, dvcfile, name, stage_data, lock_data): from . import PipelineStage, Stage, loads_from @@ -63,8 +127,10 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data): ) stage = loads_from(PipelineStage, dvcfile.repo, path, wdir, stage_data) stage.name = name + params = stage_data.pop("params", {}) stage._fill_stage_dependencies(**stage_data) stage._fill_stage_outputs(**stage_data) + cls._load_params(stage, params, lock_data.get("params")) if lock_data: stage.cmd_changed = lock_data.get( Stage.PARAM_CMD @@ -102,7 +168,7 @@ def __contains__(self, name): return name in self.stages_data -class SingleStageLoader(collections.abc.Mapping): +class SingleStageLoader(Mapping): def __init__(self, dvcfile, stage_data, stage_text=None, tag=None): self.dvcfile = dvcfile self.stage_data = stage_data or {} diff --git a/dvc/stage/params.py b/dvc/stage/params.py index 6ea8aa65f7..9ae388636d 100644 --- a/dvc/stage/params.py +++ b/dvc/stage/params.py @@ -10,6 +10,7 @@ class StageParams: PARAM_LOCKED = "locked" PARAM_META = "meta" PARAM_ALWAYS_CHANGED = "always_changed" + PARAM_PARAMS = "params" class OutputParams(Enum): diff --git a/tests/func/params/test_diff.py b/tests/func/params/test_diff.py index 0f8dc31a86..3dab45066d 100644 --- a/tests/func/params/test_diff.py +++ b/tests/func/params/test_diff.py @@ -102,3 +102,20 @@ def test_diff_with_unchanged(tmp_dir, scm, dvc): "xyz": {"old": "val", "new": "val"}, } } + + +def test_pipeline_tracked_params(tmp_dir, scm, dvc, run_copy): + from dvc.dvcfile import PIPELINE_FILE + + tmp_dir.gen({"foo": "foo", "params.yaml": "foo: bar\nxyz: val"}) + run_copy("foo", "bar", name="copy-foo-bar", params=["foo,xyz"]) + + scm.add(["params.yaml", PIPELINE_FILE]) + scm.commit("add stage") + + tmp_dir.scm_gen("params.yaml", "foo: baz\nxyz: val", commit="baz") + tmp_dir.scm_gen("params.yaml", "foo: qux\nxyz: val", commit="qux") + + assert dvc.params.diff(a_rev="HEAD~2") == { + "params.yaml": {"foo": {"old": "bar", "new": "qux"}} + } diff --git a/tests/func/params/test_show.py b/tests/func/params/test_show.py index b52d9e9a12..e1f72cb3c4 100644 --- a/tests/func/params/test_show.py +++ b/tests/func/params/test_show.py @@ -42,3 +42,19 @@ def test_show_branch(tmp_dir, scm, dvc): "working tree": {"params.yaml": {"foo": "bar"}}, "branch": {"params.yaml": {"foo": "baz"}}, } + + +def test_pipeline_tracked_params(tmp_dir, scm, dvc, run_copy): + from dvc.dvcfile import PIPELINE_FILE + + tmp_dir.gen({"foo": "foo", "params.yaml": "foo: bar\nxyz: val"}) + run_copy("foo", "bar", name="copy-foo-bar", params=["foo,xyz"]) + scm.add(["params.yaml", PIPELINE_FILE]) + scm.commit("add stage") + + tmp_dir.scm_gen("params.yaml", "foo: baz\nxyz: val", commit="baz") + tmp_dir.scm_gen("params.yaml", "foo: qux\nxyz: val", commit="qux") + + assert dvc.params.show(revs=["master"]) == { + "master": {"params.yaml": {"foo": "qux", "xyz": "val"}} + } diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index 529f1f6589..10b6ca4135 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -1,7 +1,10 @@ import os +from copy import deepcopy from textwrap import dedent import pytest +import yaml +from funcy import lsplit from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK from dvc.exceptions import CyclicGraphError @@ -485,3 +488,60 @@ def test_cyclic_graph_error(tmp_dir, dvc, run_copy): dump_stage_file(PIPELINE_FILE, data) with pytest.raises(CyclicGraphError): dvc.reproduce(":copy-baz-foo") + + +def test_repro_multiple_params(tmp_dir, dvc): + from tests.func.test_run_multistage import supported_params + + from dvc.serialize import get_params_deps + + with (tmp_dir / "params2.yaml").open("w+") as f: + yaml.dump(supported_params, f) + + with (tmp_dir / "params.yaml").open("w+") as f: + yaml.dump(supported_params, f) + + (tmp_dir / "foo").write_text("foo") + stage = dvc.run( + name="read_params", + deps=["foo"], + outs=["bar"], + params=[ + "params2.yaml:lists,floats,name", + "answer,floats,nested.nested1", + ], + cmd="cat params2.yaml params.yaml > bar", + ) + + params, deps = get_params_deps(stage) + assert len(params) == 2 + assert len(deps) == 1 + assert len(stage.outs) == 1 + + lockfile = stage.dvcfile._lockfile + assert lockfile.load()["read_params"]["params"] == { + "params2.yaml": { + "lists": [42, 42.0, "42"], + "floats": 42.0, + "name": "Answer", + }, + "params.yaml": { + "answer": 42, + "floats": 42.0, + "nested.nested1": {"nested2": "42", "nested2-2": 41.99999}, + }, + } + data, _ = stage.dvcfile._load() + params = data["stages"]["read_params"]["params"] + + custom, defaults = lsplit(lambda v: isinstance(v, dict), params) + assert set(custom[0]["params2.yaml"]) == {"name", "lists", "floats"} + assert set(defaults) == {"answer", "floats", "nested.nested1"} + + assert not dvc.reproduce(stage.addressing) + with (tmp_dir / "params.yaml").open("w+") as f: + params = deepcopy(supported_params) + params["answer"] = 43 + yaml.dump(params, f) + + assert dvc.reproduce(stage.addressing) == [stage] diff --git a/tests/func/test_run_multistage.py b/tests/func/test_run_multistage.py index f258622738..a156ad55ff 100644 --- a/tests/func/test_run_multistage.py +++ b/tests/func/test_run_multistage.py @@ -1,6 +1,8 @@ import pytest import os +import yaml + from dvc.stage.exceptions import InvalidStageName, DuplicateStageName @@ -169,3 +171,62 @@ def test_run_already_exists(tmp_dir, dvc, run_copy): run_copy("foo", "bar", name="copy") with pytest.raises(DuplicateStageName): run_copy("bar", "foobar", name="copy") + + +supported_params = { + "name": "Answer", + "answer": 42, + "floats": 42.0, + "lists": [42, 42.0, "42"], + "nested": {"nested1": {"nested2": "42", "nested2-2": 41.99999}}, +} + + +def test_run_params_default(tmp_dir, dvc): + from dvc.dependency import ParamsDependency + + with (tmp_dir / "params.yaml").open("w+") as f: + yaml.dump(supported_params, f) + + stage = dvc.run( + name="read_params", + params=["nested.nested1.nested2"], + cmd="cat params.yaml", + ) + isinstance(stage.deps[0], ParamsDependency) + assert stage.deps[0].params == ["nested.nested1.nested2"] + + lockfile = stage.dvcfile._lockfile + assert lockfile.load()["read_params"]["params"] == { + "params.yaml": {"nested.nested1.nested2": "42"} + } + + data, _ = stage.dvcfile._load() + assert data["stages"]["read_params"]["params"] == [ + "nested.nested1.nested2" + ] + + +def test_run_params_custom_file(tmp_dir, dvc): + from dvc.dependency import ParamsDependency + + with (tmp_dir / "params2.yaml").open("w+") as f: + yaml.dump(supported_params, f) + + stage = dvc.run( + name="read_params", + params=["params2.yaml:lists"], + cmd="cat params2.yaml", + ) + + isinstance(stage.deps[0], ParamsDependency) + assert stage.deps[0].params == ["lists"] + lockfile = stage.dvcfile._lockfile + assert lockfile.load()["read_params"]["params"] == { + "params2.yaml": {"lists": [42, 42.0, "42"]} + } + + data, _ = stage.dvcfile._load() + assert data["stages"]["read_params"]["params"] == [ + {"params2.yaml": ["lists"]} + ]