From bbca2e5096543fc5fc2b3bb095fd59dd34dbc136 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 21 Jul 2020 18:10:09 +0900 Subject: [PATCH 01/17] experiments: add initial local tmpdir executor --- dvc/repo/experiments/__init__.py | 30 ++++++++++-- dvc/repo/experiments/executor.py | 81 ++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 dvc/repo/experiments/executor.py diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index dcbe550354..ea158c1169 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -6,10 +6,11 @@ from funcy import cached_property from dvc.exceptions import DvcException +from dvc.repo.experiments.executor import LocalExecutor from dvc.scm.git import Git from dvc.stage.serialize import to_lockfile from dvc.utils import dict_sha256, env2bool, relpath -from dvc.utils.fs import remove +from dvc.utils.fs import copyfile, remove logger = logging.getLogger(__name__) @@ -47,10 +48,13 @@ def scm(self): return Git(self.exp_dir) return self._init_clone() + @cached_property + def dvc_dir(self): + return relpath(self.repo.dvc_dir, self.repo.scm.root_dir) + @cached_property def exp_dvc_dir(self): - dvc_dir = relpath(self.repo.dvc_dir, self.repo.scm.root_dir) - return os.path.join(self.exp_dir, dvc_dir) + return os.path.join(self.exp_dir, self.dvc_dir) @cached_property def exp_dvc(self): @@ -148,13 +152,29 @@ def new(self, *args, workspace=True, **kwargs): else: # configure params via command line here pass - self.exp_dvc.checkout() - stages = self._reproduce(*args, **kwargs) + stages = self._run_local(rev, *args, **kwargs) + # self.exp_dvc.checkout() + # stages = self._reproduce(*args, **kwargs) exp_rev = self._commit(stages, rev=rev) self.checkout_exp(exp_rev, force=True) logger.info("Generated experiment '%s'.", exp_rev[:7]) return stages + def _run_local(self, rev, *args, **kwargs): + tree = self.scm.get_tree(rev) + executor = LocalExecutor( + tree, + dvc_dir=self.dvc_dir, + cache_dir=self.repo.cache.local.cache_dir, + ) + stages = executor.run(*args, **kwargs) + logger.debug("copying tmp output from '%s'", executor.tmp_dir) + for fname in tree.walk_files(tree.tree_root): + src = executor.path_info / relpath(fname, tree.tree_root) + copyfile(src, fname) + executor.cleanup() + return stages + def checkout_exp(self, rev, force=False): """Checkout an experiment to the user's workspace.""" from git.exc import GitCommandError diff --git a/dvc/repo/experiments/executor.py b/dvc/repo/experiments/executor.py new file mode 100644 index 0000000000..ee9ef393d5 --- /dev/null +++ b/dvc/repo/experiments/executor.py @@ -0,0 +1,81 @@ +import logging +import os +from contextlib import contextmanager +from tempfile import TemporaryDirectory + +from funcy import cached_property + +from dvc.path_info import PathInfo +from dvc.tree.base import BaseTree +from dvc.utils import relpath +from dvc.utils.fs import copyfile, makedirs + +logger = logging.getLogger(__name__) + + +class ExperimentExecutor: + """Base class for executing experiments in parallel.""" + + def __init__(self, src_tree: BaseTree, **kwargs): + pass + + def run(self, *args, **kwargs): + pass + + def cleanup(self): + pass + + +class LocalExecutor(ExperimentExecutor): + def __init__(self, src_tree: BaseTree, **kwargs): + dvc_dir = kwargs.pop("dvc_dir") + cache_dir = kwargs.pop("cache_dir") + super().__init__(src_tree, **kwargs) + self.tmp_dir = TemporaryDirectory() + logger.debug("Init local executor in dir '%s'.", self.tmp_dir) + self.dvc_dir = os.path.join(self.tmp_dir.name, dvc_dir) + try: + for fname in src_tree.walk_files(src_tree.tree_root): + dest = self.path_info / relpath(fname, src_tree.tree_root) + if not os.path.exists(dest.parent): + makedirs(dest.parent) + copyfile(fname, dest) + except Exception: + self.tmp_dir.cleanup() + raise + self._config(cache_dir) + + def _config(self, cache_dir): + local_config = os.path.join(self.dvc_dir, "config.local") + logger.debug("Writing experiments local config '%s'", local_config) + with open(local_config, "w") as fobj: + fobj.write("[core]\n no_scm = true\n") + fobj.write(f"[cache]\n dir = {cache_dir}") + + @cached_property + def dvc(self): + from dvc.repo import Repo + + return Repo(self.dvc_dir) + + @cached_property + def path_info(self): + return PathInfo(self.tmp_dir.name) + + @contextmanager + def chdir(self): + cwd = os.getcwd() + os.chdir(self.dvc.root_dir) + yield + os.chdir(cwd) + + def run(self, *args, **kwargs): + logger.debug("Running repro in '%s'", self.tmp_dir) + with self.chdir(): + self.dvc.checkout() + return self.dvc.reproduce(*args, **kwargs) + + def cleanup(self): + logger.debug("Removing tmpdir '%s'", self.tmp_dir) + self.tmp_dir.cleanup() + super().cleanup() From 703275c7a72befc6ec77e8dd022e011b5c098506 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 22 Jul 2020 15:25:39 +0900 Subject: [PATCH 02/17] fix detached head pull --- dvc/repo/experiments/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index ea158c1169..2a83c61a80 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -94,8 +94,11 @@ def _config_clone(self): def _scm_checkout(self, rev): self.scm.repo.git.reset(hard=True) + if self.scm.repo.head.is_detached: + # switch back to default branch + self.scm.repo.heads[0].checkout() if not Git.is_sha(rev) or not self.scm.has_rev(rev): - self.scm.fetch(all=True) + self.scm.pull() logger.debug("Checking out base experiment commit '%s'", rev) self.scm.checkout(rev) From 83a4bd039a40444d5a0fb1e02c4e68563ce210de Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 22 Jul 2020 16:09:28 +0900 Subject: [PATCH 03/17] experiments: include unchanged (unrepro'd) stages in experiment hash --- dvc/repo/experiments/__init__.py | 10 ++++------ dvc/repo/experiments/executor.py | 12 +++++++++++- dvc/repo/reproduce.py | 7 +++++-- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 2a83c61a80..ed1e7051ea 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -155,10 +155,8 @@ def new(self, *args, workspace=True, **kwargs): else: # configure params via command line here pass - stages = self._run_local(rev, *args, **kwargs) - # self.exp_dvc.checkout() - # stages = self._reproduce(*args, **kwargs) - exp_rev = self._commit(stages, rev=rev) + stages, unchanged = self._run_local(rev, *args, **kwargs) + exp_rev = self._commit(stages + unchanged, rev=rev) self.checkout_exp(exp_rev, force=True) logger.info("Generated experiment '%s'.", exp_rev[:7]) return stages @@ -170,13 +168,13 @@ def _run_local(self, rev, *args, **kwargs): dvc_dir=self.dvc_dir, cache_dir=self.repo.cache.local.cache_dir, ) - stages = executor.run(*args, **kwargs) + stages, unchanged = executor.run(*args, **kwargs) logger.debug("copying tmp output from '%s'", executor.tmp_dir) for fname in tree.walk_files(tree.tree_root): src = executor.path_info / relpath(fname, tree.tree_root) copyfile(src, fname) executor.cleanup() - return stages + return stages, unchanged def checkout_exp(self, rev, force=False): """Checkout an experiment to the user's workspace.""" diff --git a/dvc/repo/experiments/executor.py b/dvc/repo/experiments/executor.py index ee9ef393d5..218fcffb7f 100644 --- a/dvc/repo/experiments/executor.py +++ b/dvc/repo/experiments/executor.py @@ -6,6 +6,7 @@ from funcy import cached_property from dvc.path_info import PathInfo +from dvc.stage import PipelineStage from dvc.tree.base import BaseTree from dvc.utils import relpath from dvc.utils.fs import copyfile, makedirs @@ -70,10 +71,19 @@ def chdir(self): os.chdir(cwd) def run(self, *args, **kwargs): + unchanged = [] + + def filter_pipeline(stage): + if isinstance(stage, PipelineStage): + unchanged.append(stage) + logger.debug("Running repro in '%s'", self.tmp_dir) with self.chdir(): self.dvc.checkout() - return self.dvc.reproduce(*args, **kwargs) + stages = self.dvc.reproduce( + *args, on_unchanged=filter_pipeline, **kwargs, + ) + return stages, unchanged def cleanup(self): logger.debug("Removing tmpdir '%s'", self.tmp_dir) diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index e127c4ea42..1a728958bc 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -110,7 +110,7 @@ def reproduce( def _reproduce_stages( - G, stages, downstream=False, single_item=False, **kwargs + G, stages, downstream=False, single_item=False, on_unchanged=None, **kwargs ): r"""Derive the evaluation of the given node for the given graph. @@ -194,7 +194,10 @@ def _reproduce_stages( # dependencies didn't change. kwargs["force"] = True - result.extend(ret) + if ret: + result.extend(ret) + elif on_unchanged is not None: + on_unchanged(stage) except Exception as exc: raise ReproductionError(stage.relpath) from exc From aa1e0440c9e2bad5f73df015753919e2ead3039e Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 22 Jul 2020 17:01:17 +0900 Subject: [PATCH 04/17] experiments: repro single experiment using tmpdir executor by default --- dvc/repo/experiments/__init__.py | 45 +++++++++++++++++++++++--------- dvc/repo/experiments/executor.py | 9 ++++++- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index ed1e7051ea..dfd16b097a 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -1,12 +1,14 @@ import logging import os import tempfile +from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import contextmanager +from typing import Iterable from funcy import cached_property from dvc.exceptions import DvcException -from dvc.repo.experiments.executor import LocalExecutor +from dvc.repo.experiments.executor import ExperimentExecutor, LocalExecutor from dvc.scm.git import Git from dvc.stage.serialize import to_lockfile from dvc.utils import dict_sha256, env2bool, relpath @@ -155,26 +157,44 @@ def new(self, *args, workspace=True, **kwargs): else: # configure params via command line here pass - stages, unchanged = self._run_local(rev, *args, **kwargs) + executor = LocalExecutor( + self.scm.get_tree(rev), + dvc_dir=self.dvc_dir, + cache_dir=self.repo.cache.local.cache_dir, + ) + + self._run([executor], *args, **kwargs) + stages, unchanged = executor.result + self._collect_output(rev, executor) + executor.cleanup() + exp_rev = self._commit(stages + unchanged, rev=rev) self.checkout_exp(exp_rev, force=True) logger.info("Generated experiment '%s'.", exp_rev[:7]) return stages - def _run_local(self, rev, *args, **kwargs): - tree = self.scm.get_tree(rev) - executor = LocalExecutor( - tree, - dvc_dir=self.dvc_dir, - cache_dir=self.repo.cache.local.cache_dir, - ) - stages, unchanged = executor.run(*args, **kwargs) + def _run(self, executors: Iterable, *args, **kwargs): + """Run the specified ExperimentExecutors in parallel. + + All experiments will be reproduced with the same `dvc repro` options + (via *args, **kwargs). + """ + # TODO: setup jobs + with ThreadPoolExecutor(max_workers=1) as thread_exec: + futures = [ + thread_exec.submit(executor.run, *args, **kwargs) + for executor in executors + ] + for _ in as_completed(futures): + # TODO: collect repro errors + pass + + def _collect_output(self, rev: str, executor: ExperimentExecutor): logger.debug("copying tmp output from '%s'", executor.tmp_dir) + tree = self.scm.get_tree(rev) for fname in tree.walk_files(tree.tree_root): src = executor.path_info / relpath(fname, tree.tree_root) copyfile(src, fname) - executor.cleanup() - return stages, unchanged def checkout_exp(self, rev, force=False): """Checkout an experiment to the user's workspace.""" @@ -183,7 +203,6 @@ def checkout_exp(self, rev, force=False): if force: self.repo.scm.repo.git.reset(hard=True) - logger.debug(f"checkout {rev}") self._scm_checkout(rev) tmp = tempfile.NamedTemporaryFile(delete=False).name diff --git a/dvc/repo/experiments/executor.py b/dvc/repo/experiments/executor.py index 218fcffb7f..8f6115f85a 100644 --- a/dvc/repo/experiments/executor.py +++ b/dvc/repo/experiments/executor.py @@ -45,6 +45,8 @@ def __init__(self, src_tree: BaseTree, **kwargs): self.tmp_dir.cleanup() raise self._config(cache_dir) + self._stages = None + self._unchanged = None def _config(self, cache_dir): local_config = os.path.join(self.dvc_dir, "config.local") @@ -63,6 +65,10 @@ def dvc(self): def path_info(self): return PathInfo(self.tmp_dir.name) + @property + def result(self): + return self._stages, self._unchanged + @contextmanager def chdir(self): cwd = os.getcwd() @@ -83,7 +89,8 @@ def filter_pipeline(stage): stages = self.dvc.reproduce( *args, on_unchanged=filter_pipeline, **kwargs, ) - return stages, unchanged + self._stages = stages + self._unchanged = unchanged def cleanup(self): logger.debug("Removing tmpdir '%s'", self.tmp_dir) From 48b74e0539c81a574a6b0adf86b8c4265082b83e Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 23 Jul 2020 17:21:11 +0900 Subject: [PATCH 05/17] stage experiments as stash commits before running --- dvc/repo/experiments/__init__.py | 35 +++++++++++++++++++------------- dvc/repo/experiments/executor.py | 5 +++-- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index dfd16b097a..2f67a62297 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -104,9 +104,8 @@ def _scm_checkout(self, rev): logger.debug("Checking out base experiment commit '%s'", rev) self.scm.checkout(rev) - def _patch_exp(self): - """Create a patch based on the current (parent) workspace and apply it - to the experiment workspace. + def _stash_exp(self): + """Stash changes from the current (parent) workspace as an experiment. """ tmp = tempfile.NamedTemporaryFile(delete=False).name try: @@ -120,6 +119,10 @@ def _patch_exp(self): ) finally: remove(tmp) + rev = self.scm.get_rev() + msg = f"Stashed experiment on {rev[:7]}" + self.scm.repo.git.stash("push", "-m", msg) + return self.scm.resolve_rev("stash@{0}") def _commit(self, stages, check_exists=True, branch=True, rev=None): """Commit stages as an experiment and return the commit SHA.""" @@ -150,23 +153,27 @@ def new(self, *args, workspace=True, **kwargs): self._scm_checkout(rev) if workspace: try: - self._patch_exp() + exp_rev = self._stash_exp() except UnchangedExperimentError as exc: logger.info("Reproducing existing experiment '%s'.", rev[:7]) raise exc else: # configure params via command line here pass - executor = LocalExecutor( - self.scm.get_tree(rev), - dvc_dir=self.dvc_dir, - cache_dir=self.repo.cache.local.cache_dir, - ) - - self._run([executor], *args, **kwargs) - stages, unchanged = executor.result - self._collect_output(rev, executor) - executor.cleanup() + + try: + executor = LocalExecutor( + self.scm.get_tree(exp_rev), + dvc_dir=self.dvc_dir, + cache_dir=self.repo.cache.local.cache_dir, + ) + + self._run([executor], *args, **kwargs) + stages, unchanged = executor.result + self._collect_output(rev, executor) + executor.cleanup() + finally: + self.scm.repo.git.stash("drop") exp_rev = self._commit(stages + unchanged, rev=rev) self.checkout_exp(exp_rev, force=True) diff --git a/dvc/repo/experiments/executor.py b/dvc/repo/experiments/executor.py index 8f6115f85a..395b5ac2ad 100644 --- a/dvc/repo/experiments/executor.py +++ b/dvc/repo/experiments/executor.py @@ -9,7 +9,7 @@ from dvc.stage import PipelineStage from dvc.tree.base import BaseTree from dvc.utils import relpath -from dvc.utils.fs import copyfile, makedirs +from dvc.utils.fs import copy_fobj_to_file, makedirs logger = logging.getLogger(__name__) @@ -40,7 +40,8 @@ def __init__(self, src_tree: BaseTree, **kwargs): dest = self.path_info / relpath(fname, src_tree.tree_root) if not os.path.exists(dest.parent): makedirs(dest.parent) - copyfile(fname, dest) + with src_tree.open(fname, "rb") as fobj: + copy_fobj_to_file(fobj, dest) except Exception: self.tmp_dir.cleanup() raise From 7a53923e50869cd6b260199028051f4977c99cdf Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 24 Jul 2020 16:34:25 +0900 Subject: [PATCH 06/17] include repro args/kwargs in stashed experiments --- dvc/repo/experiments/__init__.py | 28 ++++++++++++++++++------- dvc/repo/experiments/executor.py | 35 +++++++++++++++++++++++++++----- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 2f67a62297..2f9ee73642 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -29,6 +29,7 @@ class Experiments: """ EXPERIMENTS_DIR = "experiments" + PACKED_ARGS_FILE = "repro.dat" def __init__(self, repo): if not ( @@ -104,7 +105,7 @@ def _scm_checkout(self, rev): logger.debug("Checking out base experiment commit '%s'", rev) self.scm.checkout(rev) - def _stash_exp(self): + def _stash_exp(self, *args, **kwargs): """Stash changes from the current (parent) workspace as an experiment. """ tmp = tempfile.NamedTemporaryFile(delete=False).name @@ -120,10 +121,20 @@ def _stash_exp(self): finally: remove(tmp) rev = self.scm.get_rev() + self._pack_args(*args, **kwargs) msg = f"Stashed experiment on {rev[:7]}" self.scm.repo.git.stash("push", "-m", msg) return self.scm.resolve_rev("stash@{0}") + def _pack_args(self, *args, **kwargs): + args_file = os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) + ExperimentExecutor.pack_repro_args(args_file, *args, **kwargs) + self.scm.add(args_file) + + def _unpack_args(self, tree=None): + args_file = os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) + return ExperimentExecutor.unpack_repro_args(args_file, tree=tree) + def _commit(self, stages, check_exists=True, branch=True, rev=None): """Commit stages as an experiment and return the commit SHA.""" hash_ = self.exp_hash(stages) @@ -153,7 +164,7 @@ def new(self, *args, workspace=True, **kwargs): self._scm_checkout(rev) if workspace: try: - exp_rev = self._stash_exp() + exp_rev = self._stash_exp(*args, **kwargs) except UnchangedExperimentError as exc: logger.info("Reproducing existing experiment '%s'.", rev[:7]) raise exc @@ -162,13 +173,17 @@ def new(self, *args, workspace=True, **kwargs): pass try: + tree = self.scm.get_tree(exp_rev) + repro_args, repro_kwargs = self._unpack_args(tree) executor = LocalExecutor( - self.scm.get_tree(exp_rev), + tree, + repro_args=repro_args, + repro_kwargs=repro_kwargs, dvc_dir=self.dvc_dir, cache_dir=self.repo.cache.local.cache_dir, ) - self._run([executor], *args, **kwargs) + self._run([executor]) stages, unchanged = executor.result self._collect_output(rev, executor) executor.cleanup() @@ -180,7 +195,7 @@ def new(self, *args, workspace=True, **kwargs): logger.info("Generated experiment '%s'.", exp_rev[:7]) return stages - def _run(self, executors: Iterable, *args, **kwargs): + def _run(self, executors: Iterable): """Run the specified ExperimentExecutors in parallel. All experiments will be reproduced with the same `dvc repro` options @@ -189,8 +204,7 @@ def _run(self, executors: Iterable, *args, **kwargs): # TODO: setup jobs with ThreadPoolExecutor(max_workers=1) as thread_exec: futures = [ - thread_exec.submit(executor.run, *args, **kwargs) - for executor in executors + thread_exec.submit(executor.run) for executor in executors ] for _ in as_completed(futures): # TODO: collect repro errors diff --git a/dvc/repo/experiments/executor.py b/dvc/repo/experiments/executor.py index 395b5ac2ad..bbe38532d6 100644 --- a/dvc/repo/experiments/executor.py +++ b/dvc/repo/experiments/executor.py @@ -1,5 +1,6 @@ import logging import os +import pickle from contextlib import contextmanager from tempfile import TemporaryDirectory @@ -15,17 +16,39 @@ class ExperimentExecutor: - """Base class for executing experiments in parallel.""" + """Base class for executing experiments in parallel. + + Keyword args: + repro_args: Args to be passed into reproduce. + repro_kwargs: Keyword args to be passed into reproduce. + """ def __init__(self, src_tree: BaseTree, **kwargs): - pass + self.src_tree = src_tree + self.repro_args = kwargs.pop("repro_args", []) + self.repro_kwargs = kwargs.pop("repro_kwargs", {}) - def run(self, *args, **kwargs): + def run(self): pass def cleanup(self): pass + # TODO: come up with better way to stash repro arguments + @staticmethod + def pack_repro_args(path, *args, tree=None, **kwargs): + open_func = tree.open if tree else open + data = {"args": args, "kwargs": kwargs} + with open_func(path, "wb") as fobj: + pickle.dump(data, fobj) + + @staticmethod + def unpack_repro_args(path, tree=None): + open_func = tree.open if tree else open + with open_func(path, "rb") as fobj: + data = pickle.load(fobj) + return data["args"], data["kwargs"] + class LocalExecutor(ExperimentExecutor): def __init__(self, src_tree: BaseTree, **kwargs): @@ -77,7 +100,7 @@ def chdir(self): yield os.chdir(cwd) - def run(self, *args, **kwargs): + def run(self): unchanged = [] def filter_pipeline(stage): @@ -88,7 +111,9 @@ def filter_pipeline(stage): with self.chdir(): self.dvc.checkout() stages = self.dvc.reproduce( - *args, on_unchanged=filter_pipeline, **kwargs, + *self.repro_args, + on_unchanged=filter_pipeline, + **self.repro_kwargs, ) self._stages = stages self._unchanged = unchanged From 41b53b44c46876f6f9cd3eee72812efeb3dda13f Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 24 Jul 2020 20:54:14 +0900 Subject: [PATCH 07/17] support running arbitrary experiment commits (including stash commits) --- dvc/repo/experiments/__init__.py | 188 ++++++++++++++++++++++++------- dvc/repo/experiments/executor.py | 27 ++--- dvc/repo/reproduce.py | 2 +- 3 files changed, 161 insertions(+), 56 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 2f9ee73642..23f4c74b22 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -1,9 +1,10 @@ import logging import os +import re import tempfile from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import contextmanager -from typing import Iterable +from typing import Iterable, Optional from funcy import cached_property @@ -18,7 +19,9 @@ class UnchangedExperimentError(DvcException): - pass + def __init__(self, rev): + super().__init__("Experiment identical to baseline '{rev[:7]}'.") + self.rev = rev class Experiments: @@ -30,6 +33,10 @@ class Experiments: EXPERIMENTS_DIR = "experiments" PACKED_ARGS_FILE = "repro.dat" + STASH_MSG_PREFIX = "dvc-exp-" + STASH_EXPERIMENT_RE = re.compile( + r"(?:On \(.*\): )dvc-exp-(?P[0-9a-f]+)$" + ) def __init__(self, repo): if not ( @@ -66,6 +73,23 @@ def exp_dvc(self): return Repo(self.exp_dvc_dir) + @cached_property + def args_file(self): + return os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) + + @property + def stash_reflog(self): + return self.scm.repo.refs["refs/stash"].log() + + @property + def stash_revs(self): + revs = {} + for i, entry in enumerate(self.stash_reflog): + m = self.STASH_EXPERIMENT_RE.match(entry.message) + if m: + revs[entry.newhexsha] = (i, m.group("baseline_rev")) + return revs + @staticmethod def exp_hash(stages): exp_data = {} @@ -108,6 +132,7 @@ def _scm_checkout(self, rev): def _stash_exp(self, *args, **kwargs): """Stash changes from the current (parent) workspace as an experiment. """ + rev = self.scm.get_rev() tmp = tempfile.NamedTemporaryFile(delete=False).name try: self.repo.scm.repo.git.diff(patch=True, output=tmp) @@ -115,28 +140,27 @@ def _stash_exp(self, *args, **kwargs): logger.debug("Patching experiment workspace") self.scm.repo.git.apply(tmp) else: - raise UnchangedExperimentError( - "Experiment identical to baseline commit." - ) + raise UnchangedExperimentError(rev) finally: remove(tmp) - rev = self.scm.get_rev() self._pack_args(*args, **kwargs) - msg = f"Stashed experiment on {rev[:7]}" + msg = f"{self.STASH_MSG_PREFIX}{rev}" self.scm.repo.git.stash("push", "-m", msg) return self.scm.resolve_rev("stash@{0}") def _pack_args(self, *args, **kwargs): - args_file = os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) - ExperimentExecutor.pack_repro_args(args_file, *args, **kwargs) - self.scm.add(args_file) + ExperimentExecutor.pack_repro_args(self.args_file, *args, **kwargs) + self.scm.add(self.args_file) def _unpack_args(self, tree=None): args_file = os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) return ExperimentExecutor.unpack_repro_args(args_file, tree=tree) - def _commit(self, stages, check_exists=True, branch=True, rev=None): + def _commit(self, stages, check_exists=True, branch=True): """Commit stages as an experiment and return the commit SHA.""" + if not self.scm.is_dirty(): + raise UnchangedExperimentError(self.scm.get_rev()) + rev = self.scm.get_rev() hash_ = self.exp_hash(stages) exp_name = f"{rev[:7]}-{hash_}" if branch: @@ -149,10 +173,13 @@ def _commit(self, stages, check_exists=True, branch=True, rev=None): self.scm.commit(f"Add experiment {exp_name}") return self.scm.get_rev() - def _reproduce(self, *args, **kwargs): - """Run `dvc repro` inside the experiments workspace.""" - with self.chdir(): - return self.exp_dvc.reproduce(*args, **kwargs) + def reproduce_one(self, *args, **kwargs): + """Reproduce and checkout a single experiment.""" + stash_rev = self.new(**kwargs) + result = self.reproduce([stash_rev], keep_stash=False) + for exp_rev, (stages, _) in result.items(): + self.checkout_exp(exp_rev, force=True) + return stages def new(self, *args, workspace=True, **kwargs): """Create a new experiment. @@ -164,51 +191,128 @@ def new(self, *args, workspace=True, **kwargs): self._scm_checkout(rev) if workspace: try: - exp_rev = self._stash_exp(*args, **kwargs) + stash_rev = self._stash_exp(*args, **kwargs) except UnchangedExperimentError as exc: logger.info("Reproducing existing experiment '%s'.", rev[:7]) raise exc else: # configure params via command line here pass - - try: - tree = self.scm.get_tree(exp_rev) + logger.debug( + "Stashed experiment '%s' for future execution.", stash_rev + ) + return stash_rev + + def reproduce( + self, + revs: Optional[Iterable] = None, + keep_stash: Optional[bool] = True, + ): + """Reproduce the specified experiments. + + Args: + revs: If revs is not specified, all stashed experiments will be + reproduced. + keep_stash: If True, stashed experiments will be preserved if they + fail to reproduce successfully. + """ + stash_revs = self.stash_revs + + # to_run contains mapping of: + # input_rev: baseline_rev + # where input_rev contains the changes to execute (usually a stash + # commit) and baseline_rev is the baseline to compare output against. + # The final experiment commit will be branched from baseline_rev. + if revs is None: + to_run = { + rev: baseline_rev for rev, (_, baseline_rev) in stash_revs + } + else: + to_run = { + rev: stash_revs[rev][1] if rev in stash_revs else rev + for rev in revs + } + + # setup executors + executors = {} + for rev, baseline_rev in to_run.items(): + tree = self.scm.get_tree(rev) repro_args, repro_kwargs = self._unpack_args(tree) executor = LocalExecutor( tree, + baseline_rev, repro_args=repro_args, repro_kwargs=repro_kwargs, dvc_dir=self.dvc_dir, cache_dir=self.repo.cache.local.cache_dir, ) + executors[rev] = executor + + exec_results = self._reproduce(executors) + + if keep_stash: + # only drop successfully run stashed experiments + to_drop = sorted( + ( + stash_revs[rev][0] + for rev in exec_results + if rev in stash_revs + ), + reverse=True, + ) + else: + # drop all stashed experiments + to_drop = sorted( + (stash_revs[rev][0] for rev in to_run if rev in stash_revs), + reverse=True, + ) + for index in to_drop: + self.scm.repo.git.stash("drop", index) - self._run([executor]) - stages, unchanged = executor.result - self._collect_output(rev, executor) - executor.cleanup() - finally: - self.scm.repo.git.stash("drop") - - exp_rev = self._commit(stages + unchanged, rev=rev) - self.checkout_exp(exp_rev, force=True) - logger.info("Generated experiment '%s'.", exp_rev[:7]) - return stages + result = {} + for _, exp_result in exec_results.items(): + result.update(exp_result) + return result - def _run(self, executors: Iterable): - """Run the specified ExperimentExecutors in parallel. + def _reproduce(self, executors: dict, jobs: Optional[int] = 1) -> dict: + """Run dvc repro for the specified ExperimentExecutors in parallel. - All experiments will be reproduced with the same `dvc repro` options - (via *args, **kwargs). + Returns dict containing successfully executed experiments. """ - # TODO: setup jobs - with ThreadPoolExecutor(max_workers=1) as thread_exec: - futures = [ - thread_exec.submit(executor.run) for executor in executors - ] - for _ in as_completed(futures): - # TODO: collect repro errors - pass + result = {} + + with ThreadPoolExecutor(max_workers=jobs) as thread_exec: + futures = { + thread_exec.submit(executor.reproduce): (rev, executor) + for rev, executor in executors.items() + } + for future in as_completed(futures): + rev, executor = futures[future] + exc = future.exception() + if exc: + logger.exception( + "Failed to reproduce experiment '%s'", rev + ) + else: + stages, unchanged = future.result() + logger.debug(f"ran exp based on {executor.baseline_rev}") + self._scm_checkout(executor.baseline_rev) + self._collect_output(executor.baseline_rev, executor) + remove(self.args_file) + try: + exp_rev = self._commit(stages + unchanged) + except UnchangedExperimentError: + logger.debug( + "Experiment '%s' identical to baseline '%s'", + rev, + executor.baseline_rev, + ) + exp_rev = executor.baseline_rev + logger.info("Reproduced experiment '%s'.", exp_rev[:7]) + result[rev] = {exp_rev: (stages, unchanged)} + executor.cleanup() + + return result def _collect_output(self, rev: str, executor: ExperimentExecutor): logger.debug("copying tmp output from '%s'", executor.tmp_dir) diff --git a/dvc/repo/experiments/executor.py b/dvc/repo/experiments/executor.py index bbe38532d6..a4398f8282 100644 --- a/dvc/repo/experiments/executor.py +++ b/dvc/repo/experiments/executor.py @@ -18,13 +18,18 @@ class ExperimentExecutor: """Base class for executing experiments in parallel. - Keyword args: + Args: + src_tree: source tree for this experiment. + baseline_rev: baseline revision that this experiment is derived from. + + Optional keyword args: repro_args: Args to be passed into reproduce. repro_kwargs: Keyword args to be passed into reproduce. """ - def __init__(self, src_tree: BaseTree, **kwargs): + def __init__(self, src_tree: BaseTree, baseline_rev: str, **kwargs): self.src_tree = src_tree + self.baseline_rev = baseline_rev self.repro_args = kwargs.pop("repro_args", []) self.repro_kwargs = kwargs.pop("repro_kwargs", {}) @@ -51,10 +56,12 @@ def unpack_repro_args(path, tree=None): class LocalExecutor(ExperimentExecutor): - def __init__(self, src_tree: BaseTree, **kwargs): + """Local machine exepriment executor.""" + + def __init__(self, src_tree: BaseTree, baseline_rev: str, **kwargs): dvc_dir = kwargs.pop("dvc_dir") cache_dir = kwargs.pop("cache_dir") - super().__init__(src_tree, **kwargs) + super().__init__(src_tree, baseline_rev, **kwargs) self.tmp_dir = TemporaryDirectory() logger.debug("Init local executor in dir '%s'.", self.tmp_dir) self.dvc_dir = os.path.join(self.tmp_dir.name, dvc_dir) @@ -69,8 +76,6 @@ def __init__(self, src_tree: BaseTree, **kwargs): self.tmp_dir.cleanup() raise self._config(cache_dir) - self._stages = None - self._unchanged = None def _config(self, cache_dir): local_config = os.path.join(self.dvc_dir, "config.local") @@ -89,10 +94,6 @@ def dvc(self): def path_info(self): return PathInfo(self.tmp_dir.name) - @property - def result(self): - return self._stages, self._unchanged - @contextmanager def chdir(self): cwd = os.getcwd() @@ -100,7 +101,8 @@ def chdir(self): yield os.chdir(cwd) - def run(self): + def reproduce(self): + """Run dvc repro and return the result.""" unchanged = [] def filter_pipeline(stage): @@ -115,8 +117,7 @@ def filter_pipeline(stage): on_unchanged=filter_pipeline, **self.repro_kwargs, ) - self._stages = stages - self._unchanged = unchanged + return stages, unchanged def cleanup(self): logger.debug("Removing tmpdir '%s'", self.tmp_dir) diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index 1a728958bc..7c58140f4a 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -73,7 +73,7 @@ def reproduce( experiment = kwargs.pop("experiment", False) if experiment and self.experiments: try: - return self.experiments.new( + return self.experiments.reproduce_one( target=target, recursive=recursive, all_pipelines=all_pipelines, From 257120443c2467cc26829ecaf34201e49b51ecdf Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Jul 2020 14:03:21 +0900 Subject: [PATCH 08/17] experiments: use ProcessPoolExecutor --- dvc/repo/experiments/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 23f4c74b22..f6ea736a10 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -2,7 +2,7 @@ import os import re import tempfile -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor, as_completed from contextlib import contextmanager from typing import Iterable, Optional @@ -281,9 +281,9 @@ def _reproduce(self, executors: dict, jobs: Optional[int] = 1) -> dict: """ result = {} - with ThreadPoolExecutor(max_workers=jobs) as thread_exec: + with ProcessPoolExecutor(max_workers=jobs) as workers: futures = { - thread_exec.submit(executor.reproduce): (rev, executor) + workers.submit(executor.reproduce): (rev, executor) for rev, executor in executors.items() } for future in as_completed(futures): From 3569bf738a57585032894c19e2e52667b5459bfa Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Jul 2020 14:06:16 +0900 Subject: [PATCH 09/17] experiments: add `dvc exp` alias for `dvc experiments` --- dvc/command/experiments.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dvc/command/experiments.py b/dvc/command/experiments.py index ab1daa3c7f..e09d646ecb 100644 --- a/dvc/command/experiments.py +++ b/dvc/command/experiments.py @@ -229,6 +229,7 @@ def add_parser(subparsers, parent_parser): experiments_parser = subparsers.add_parser( "experiments", parents=[parent_parser], + aliases=["exp"], description=append_doc_link(EXPERIMENTS_HELP, "experiments"), formatter_class=argparse.RawDescriptionHelpFormatter, ) From 9abb0a062cfd4c25de7025a800350c3ac613a079 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Jul 2020 15:15:21 +0900 Subject: [PATCH 10/17] experiments: add `dvc repro --experiment --queue` * `--queue` can be used to stage an experiment for future execution --- dvc/command/repro.py | 15 +++++++++++++++ dvc/repo/experiments/__init__.py | 26 +++++++++++++++++--------- dvc/repo/reproduce.py | 17 +++++++++++++++-- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/dvc/command/repro.py b/dvc/command/repro.py index 2b06ec03e5..ecad1c4a2d 100644 --- a/dvc/command/repro.py +++ b/dvc/command/repro.py @@ -41,6 +41,9 @@ def run(self): recursive=self.args.recursive, force_downstream=self.args.force_downstream, experiment=self.args.experiment, + queue=self.args.queue, + run_all=self.args.run_all, + jobs=self.args.jobs, ) if len(stages) == 0: @@ -174,4 +177,16 @@ def add_parser(subparsers, parent_parser): default=False, help=argparse.SUPPRESS, ) + repro_parser.add_argument( + "--queue", action="store_true", default=False, help=argparse.SUPPRESS + ) + repro_parser.add_argument( + "--run-all", + action="store_true", + default=False, + help=argparse.SUPPRESS, + ) + repro_parser.add_argument( + "-j", "--jobs", type=int, help=argparse.SUPPRESS, metavar="" + ) repro_parser.set_defaults(func=CmdRepro) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index f6ea736a10..768ab26baa 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -145,12 +145,11 @@ def _stash_exp(self, *args, **kwargs): remove(tmp) self._pack_args(*args, **kwargs) msg = f"{self.STASH_MSG_PREFIX}{rev}" - self.scm.repo.git.stash("push", "-m", msg) + self.scm.repo.git.stash("push", "--include-untracked", "-m", msg) return self.scm.resolve_rev("stash@{0}") def _pack_args(self, *args, **kwargs): ExperimentExecutor.pack_repro_args(self.args_file, *args, **kwargs) - self.scm.add(self.args_file) def _unpack_args(self, tree=None): args_file = os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) @@ -173,13 +172,21 @@ def _commit(self, stages, check_exists=True, branch=True): self.scm.commit(f"Add experiment {exp_name}") return self.scm.get_rev() - def reproduce_one(self, *args, **kwargs): + def reproduce_one(self, queue=False, **kwargs): """Reproduce and checkout a single experiment.""" stash_rev = self.new(**kwargs) + if queue: + logger.info( + "Queued experiment '%s' for future execution.", stash_rev + ) + return [] result = self.reproduce([stash_rev], keep_stash=False) - for exp_rev, (stages, _) in result.items(): - self.checkout_exp(exp_rev, force=True) - return stages + exp_rev, (stages, _) = result.items()[0] + self.checkout_exp(exp_rev, force=True) + return stages + + def reproduce_queued(self, **kwargs): + return [] def new(self, *args, workspace=True, **kwargs): """Create a new experiment. @@ -199,7 +206,7 @@ def new(self, *args, workspace=True, **kwargs): # configure params via command line here pass logger.debug( - "Stashed experiment '%s' for future execution.", stash_rev + "Stashed experiment '%s' for future execution.", stash_rev[:7] ) return stash_rev @@ -219,13 +226,14 @@ def reproduce( stash_revs = self.stash_revs # to_run contains mapping of: - # input_rev: baseline_rev + # input_rev: (stash_index, baseline_rev) # where input_rev contains the changes to execute (usually a stash # commit) and baseline_rev is the baseline to compare output against. # The final experiment commit will be branched from baseline_rev. if revs is None: to_run = { - rev: baseline_rev for rev, (_, baseline_rev) in stash_revs + rev: baseline_rev + for rev, (_, baseline_rev) in stash_revs.items() } else: to_run = { diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index 7c58140f4a..bc38e2bdf1 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -71,13 +71,20 @@ def reproduce( ) experiment = kwargs.pop("experiment", False) + queue = kwargs.pop("queue", False) + run_all = kwargs.pop("run_all", False) + jobs = kwargs.pop("jobs", 1) if experiment and self.experiments: try: - return self.experiments.reproduce_one( + return _reproduce_experiments( + self, target=target, recursive=recursive, all_pipelines=all_pipelines, - **kwargs + queue=queue, + run_all=run_all, + jobs=jobs, + **kwargs, ) except UnchangedExperimentError: # If experiment contains no changes, just run regular repro @@ -109,6 +116,12 @@ def reproduce( return _reproduce_stages(active_graph, targets, **kwargs) +def _reproduce_experiments(repo, run_all=False, jobs=1, **kwargs): + if run_all: + return repo.experiments.reproduce_all(jobs=jobs) + return repo.experiments.reproduce_one(**kwargs) + + def _reproduce_stages( G, stages, downstream=False, single_item=False, on_unchanged=None, **kwargs ): From 0f3b7d3304f573db2e7baf09ddf92a31dacb6530 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Jul 2020 15:16:30 +0900 Subject: [PATCH 11/17] experiments: show queued (unexecuted) experiments in `dvc exp show` --- dvc/command/experiments.py | 9 +++++++-- dvc/repo/experiments/show.py | 23 +++++++++++++++++------ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/dvc/command/experiments.py b/dvc/command/experiments.py index e09d646ecb..a8d591f4ad 100644 --- a/dvc/command/experiments.py +++ b/dvc/command/experiments.py @@ -48,6 +48,10 @@ def _round(val): return val def _extend(row, names, items): + if not items: + row.extend(["-"] * len(names)) + return + for fname, item in items: if isinstance(item, dict): item = flatten(item, ".") @@ -62,13 +66,14 @@ def _extend(row, names, items): for i, (rev, exp) in enumerate(experiments.items()): row = [] style = None + queued = "*" if exp.get("queued", False) else "" if rev == "baseline": row.append(f"{base_rev}") style = "bold" elif i < len(experiments) - 1: - row.append(f"├── {rev[:7]}") + row.append(f"├── {queued}{rev[:7]}") else: - row.append(f"└── {rev[:7]}") + row.append(f"└── {queued}{rev[:7]}") _extend(row, metric_names, exp.get("metrics", {}).items()) _extend(row, param_names, exp.get("params", {}).items()) diff --git a/dvc/repo/experiments/show.py b/dvc/repo/experiments/show.py index bbb2c706ff..da84079e7b 100644 --- a/dvc/repo/experiments/show.py +++ b/dvc/repo/experiments/show.py @@ -12,7 +12,7 @@ EXP_RE = re.compile(r"(?P[a-f0-9]{7})-(?P[a-f0-9]+)") -def _collect_experiment(repo, branch): +def _collect_experiment(repo, branch, stash=False): res = defaultdict(dict) for rev in repo.brancher(revs=[branch]): configs = _collect_configs(repo) @@ -20,9 +20,10 @@ def _collect_experiment(repo, branch): if params: res["params"] = params - metrics = _collect_metrics(repo, None, False) - vals = _read_metrics(repo, metrics, rev) - if vals: + res["queued"] = stash + if not stash: + metrics = _collect_metrics(repo, None, False) + vals = _read_metrics(repo, metrics, rev) res["metrics"] = vals return res @@ -37,8 +38,9 @@ def show( if revs is None: revs = [repo.scm.get_rev()] - revs = set( - repo.brancher( + revs = OrderedDict( + (rev, None) + for rev in repo.brancher( revs=revs, all_branches=all_branches, all_tags=all_tags, @@ -49,6 +51,7 @@ def show( for rev in revs: res[rev]["baseline"] = _collect_experiment(repo, rev) + # collect reproduced experiments for exp_branch in repo.experiments.scm.list_branches(): m = re.match(EXP_RE, exp_branch) if m: @@ -61,4 +64,12 @@ def show( ) res[rev][exp_rev] = experiment + # collect queued (not yet reproduced) experiments + for stash_rev, (_, baseline_rev) in repo.experiments.stash_revs.items(): + with repo.experiments.chdir(): + experiment = _collect_experiment( + repo.experiments.exp_dvc, stash_rev, stash=True + ) + res[baseline_rev][stash_rev] = experiment + return res From 9214934b5e18fe7a3285b44de0241ff85daa270b Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Jul 2020 15:42:46 +0900 Subject: [PATCH 12/17] revert ProcessPoolExecutor change --- dvc/repo/experiments/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 768ab26baa..05e0dd4ca0 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -2,7 +2,7 @@ import os import re import tempfile -from concurrent.futures import ProcessPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import contextmanager from typing import Iterable, Optional @@ -289,7 +289,7 @@ def _reproduce(self, executors: dict, jobs: Optional[int] = 1) -> dict: """ result = {} - with ProcessPoolExecutor(max_workers=jobs) as workers: + with ThreadPoolExecutor(max_workers=jobs) as workers: futures = { workers.submit(executor.reproduce): (rev, executor) for rev, executor in executors.items() From 491626397c1935dab31edaca09f003444a03d216 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Jul 2020 15:44:06 +0900 Subject: [PATCH 13/17] experiments: add `dvc repro --run-all [--jobs]` * `--run-all` can be used to run all queued experiments in parallel --- dvc/repo/experiments/__init__.py | 39 ++++++++++++++++++++++---------- dvc/repo/reproduce.py | 4 ++-- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 05e0dd4ca0..9666a91946 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -79,7 +79,9 @@ def args_file(self): @property def stash_reflog(self): - return self.scm.repo.refs["refs/stash"].log() + if "refs/stash" in self.scm.repo.refs: + return self.scm.repo.refs["refs/stash"].log() + return [] @property def stash_revs(self): @@ -145,11 +147,12 @@ def _stash_exp(self, *args, **kwargs): remove(tmp) self._pack_args(*args, **kwargs) msg = f"{self.STASH_MSG_PREFIX}{rev}" - self.scm.repo.git.stash("push", "--include-untracked", "-m", msg) + self.scm.repo.git.stash("push", "-m", msg) return self.scm.resolve_rev("stash@{0}") def _pack_args(self, *args, **kwargs): ExperimentExecutor.pack_repro_args(self.args_file, *args, **kwargs) + self.scm.add(self.args_file) def _unpack_args(self, tree=None): args_file = os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) @@ -180,12 +183,23 @@ def reproduce_one(self, queue=False, **kwargs): "Queued experiment '%s' for future execution.", stash_rev ) return [] - result = self.reproduce([stash_rev], keep_stash=False) - exp_rev, (stages, _) = result.items()[0] - self.checkout_exp(exp_rev, force=True) - return stages + results = self.reproduce([stash_rev], keep_stash=False) + if results: + exp_rev, (stages, _) = results.items()[0] + self.checkout_exp(exp_rev, force=True) + return stages + return [] def reproduce_queued(self, **kwargs): + results = self.reproduce(**kwargs) + if results: + revs = [f"{rev[:7]}" for rev in results] + logger.info( + "Successfully reproduced experiments '%s'.\n" + "Use `dvc exp checkout ` to apply the results of " + "a specific experiment to your workspace.", + ", ".join(revs), + ) return [] def new(self, *args, workspace=True, **kwargs): @@ -214,6 +228,7 @@ def reproduce( self, revs: Optional[Iterable] = None, keep_stash: Optional[bool] = True, + **kwargs, ): """Reproduce the specified experiments. @@ -256,7 +271,7 @@ def reproduce( ) executors[rev] = executor - exec_results = self._reproduce(executors) + exec_results = self._reproduce(executors, **kwargs) if keep_stash: # only drop successfully run stashed experiments @@ -297,11 +312,7 @@ def _reproduce(self, executors: dict, jobs: Optional[int] = 1) -> dict: for future in as_completed(futures): rev, executor = futures[future] exc = future.exception() - if exc: - logger.exception( - "Failed to reproduce experiment '%s'", rev - ) - else: + if exc is None: stages, unchanged = future.result() logger.debug(f"ran exp based on {executor.baseline_rev}") self._scm_checkout(executor.baseline_rev) @@ -318,6 +329,10 @@ def _reproduce(self, executors: dict, jobs: Optional[int] = 1) -> dict: exp_rev = executor.baseline_rev logger.info("Reproduced experiment '%s'.", exp_rev[:7]) result[rev] = {exp_rev: (stages, unchanged)} + else: + logger.exception( + "Failed to reproduce experiment '%s'", rev + ) executor.cleanup() return result diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index bc38e2bdf1..f94f9dd89f 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -74,7 +74,7 @@ def reproduce( queue = kwargs.pop("queue", False) run_all = kwargs.pop("run_all", False) jobs = kwargs.pop("jobs", 1) - if experiment and self.experiments: + if (experiment or run_all) and self.experiments: try: return _reproduce_experiments( self, @@ -118,7 +118,7 @@ def reproduce( def _reproduce_experiments(repo, run_all=False, jobs=1, **kwargs): if run_all: - return repo.experiments.reproduce_all(jobs=jobs) + return repo.experiments.reproduce_queued(jobs=jobs) return repo.experiments.reproduce_one(**kwargs) From ccaa0983ad9278b9aa319bf02c69b35502688318 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Jul 2020 15:49:56 +0900 Subject: [PATCH 14/17] cleanup output --- dvc/repo/experiments/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 9666a91946..4514376f23 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -180,7 +180,7 @@ def reproduce_one(self, queue=False, **kwargs): stash_rev = self.new(**kwargs) if queue: logger.info( - "Queued experiment '%s' for future execution.", stash_rev + "Queued experiment '%s' for future execution.", stash_rev[:7] ) return [] results = self.reproduce([stash_rev], keep_stash=False) @@ -195,7 +195,7 @@ def reproduce_queued(self, **kwargs): if results: revs = [f"{rev[:7]}" for rev in results] logger.info( - "Successfully reproduced experiments '%s'.\n" + "Successfully reproduced experiment(s) '%s'.\n" "Use `dvc exp checkout ` to apply the results of " "a specific experiment to your workspace.", ", ".join(revs), From bdf63565a13c4ab55b4f89d54c2a58b2a7a65bb5 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Jul 2020 18:17:37 +0900 Subject: [PATCH 15/17] experiments: use ProcessPoolExecutor * fix returning unpicklable objects error --- dvc/repo/experiments/__init__.py | 52 +++++++++++++++++--------------- dvc/repo/experiments/executor.py | 25 +++++++++------ 2 files changed, 42 insertions(+), 35 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 4514376f23..95a11e32ed 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -2,7 +2,7 @@ import os import re import tempfile -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor, as_completed from contextlib import contextmanager from typing import Iterable, Optional @@ -18,6 +18,13 @@ logger = logging.getLogger(__name__) +def hash_exp(stages): + exp_data = {} + for stage in stages: + exp_data.update(to_lockfile(stage)) + return dict_sha256(exp_data) + + class UnchangedExperimentError(DvcException): def __init__(self, rev): super().__init__("Experiment identical to baseline '{rev[:7]}'.") @@ -92,13 +99,6 @@ def stash_revs(self): revs[entry.newhexsha] = (i, m.group("baseline_rev")) return revs - @staticmethod - def exp_hash(stages): - exp_data = {} - for stage in stages: - exp_data.update(to_lockfile(stage)) - return dict_sha256(exp_data) - @contextmanager def chdir(self): cwd = os.getcwd() @@ -158,13 +158,12 @@ def _unpack_args(self, tree=None): args_file = os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) return ExperimentExecutor.unpack_repro_args(args_file, tree=tree) - def _commit(self, stages, check_exists=True, branch=True): + def _commit(self, exp_hash, check_exists=True, branch=True): """Commit stages as an experiment and return the commit SHA.""" if not self.scm.is_dirty(): raise UnchangedExperimentError(self.scm.get_rev()) rev = self.scm.get_rev() - hash_ = self.exp_hash(stages) - exp_name = f"{rev[:7]}-{hash_}" + exp_name = f"{rev[:7]}-{exp_hash}" if branch: if check_exists and exp_name in self.scm.list_branches(): logger.debug("Using existing experiment branch '%s'", exp_name) @@ -182,13 +181,11 @@ def reproduce_one(self, queue=False, **kwargs): logger.info( "Queued experiment '%s' for future execution.", stash_rev[:7] ) - return [] + return [stash_rev] results = self.reproduce([stash_rev], keep_stash=False) - if results: - exp_rev, (stages, _) = results.items()[0] + for exp_rev in results: self.checkout_exp(exp_rev, force=True) - return stages - return [] + return results def reproduce_queued(self, **kwargs): results = self.reproduce(**kwargs) @@ -200,7 +197,7 @@ def reproduce_queued(self, **kwargs): "a specific experiment to your workspace.", ", ".join(revs), ) - return [] + return results def new(self, *args, workspace=True, **kwargs): """Create a new experiment. @@ -304,22 +301,27 @@ def _reproduce(self, executors: dict, jobs: Optional[int] = 1) -> dict: """ result = {} - with ThreadPoolExecutor(max_workers=jobs) as workers: - futures = { - workers.submit(executor.reproduce): (rev, executor) - for rev, executor in executors.items() - } + with ProcessPoolExecutor(max_workers=jobs) as workers: + futures = {} + for rev, executor in executors.items(): + future = workers.submit( + executor.reproduce, + executor.dvc_dir, + cwd=executor.dvc.root_dir, + **executor.repro_kwargs, + ) + futures[future] = (rev, executor) for future in as_completed(futures): rev, executor = futures[future] exc = future.exception() if exc is None: - stages, unchanged = future.result() + exp_hash = future.result() logger.debug(f"ran exp based on {executor.baseline_rev}") self._scm_checkout(executor.baseline_rev) self._collect_output(executor.baseline_rev, executor) remove(self.args_file) try: - exp_rev = self._commit(stages + unchanged) + exp_rev = self._commit(exp_hash) except UnchangedExperimentError: logger.debug( "Experiment '%s' identical to baseline '%s'", @@ -328,7 +330,7 @@ def _reproduce(self, executors: dict, jobs: Optional[int] = 1) -> dict: ) exp_rev = executor.baseline_rev logger.info("Reproduced experiment '%s'.", exp_rev[:7]) - result[rev] = {exp_rev: (stages, unchanged)} + result[rev] = {exp_rev: exp_hash} else: logger.exception( "Failed to reproduce experiment '%s'", rev diff --git a/dvc/repo/experiments/executor.py b/dvc/repo/experiments/executor.py index a4398f8282..67eee6a9b0 100644 --- a/dvc/repo/experiments/executor.py +++ b/dvc/repo/experiments/executor.py @@ -101,23 +101,28 @@ def chdir(self): yield os.chdir(cwd) - def reproduce(self): + @staticmethod + def reproduce(dvc_dir, cwd=None, **kwargs): """Run dvc repro and return the result.""" + from dvc.repo import Repo + from dvc.repo.experiments import hash_exp + unchanged = [] def filter_pipeline(stage): if isinstance(stage, PipelineStage): unchanged.append(stage) - logger.debug("Running repro in '%s'", self.tmp_dir) - with self.chdir(): - self.dvc.checkout() - stages = self.dvc.reproduce( - *self.repro_args, - on_unchanged=filter_pipeline, - **self.repro_kwargs, - ) - return stages, unchanged + if cwd: + os.chdir(cwd) + else: + cwd = os.getcwd() + + logger.debug("Running repro in '%s'", cwd) + dvc = Repo(dvc_dir) + dvc.checkout() + stages = dvc.reproduce(on_unchanged=filter_pipeline, **kwargs) + return hash_exp(stages + unchanged) def cleanup(self): logger.debug("Removing tmpdir '%s'", self.tmp_dir) From 2fc3b4a4c5d55b0b9cad5ece8d7acb889c01957a Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Jul 2020 23:09:32 +0900 Subject: [PATCH 16/17] update tests --- tests/func/experiments/test_show.py | 1 + tests/unit/command/test_repro.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/tests/func/experiments/test_show.py b/tests/func/experiments/test_show.py index bfcb595e49..9571a8adb5 100644 --- a/tests/func/experiments/test_show.py +++ b/tests/func/experiments/test_show.py @@ -15,5 +15,6 @@ def test_show_simple(tmp_dir, scm, dvc): "baseline": { "metrics": {"metrics.yaml": {"foo": 1}}, "params": {"params.yaml": {"foo": 1}}, + "queued": False, } } diff --git a/tests/unit/command/test_repro.py b/tests/unit/command/test_repro.py index fbcf1271fc..0b2302ef74 100644 --- a/tests/unit/command/test_repro.py +++ b/tests/unit/command/test_repro.py @@ -15,6 +15,9 @@ "recursive": False, "force_downstream": False, "experiment": False, + "queue": False, + "run_all": False, + "jobs": None, } From 66df566d9ef733c4b476666ff177d1bd665664d3 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 29 Jul 2020 15:24:20 -0700 Subject: [PATCH 17/17] fix windows cleanup issue * on windows tempdir cannot be removed if we are chdir'd into that directory --- dvc/repo/experiments/__init__.py | 8 -------- dvc/repo/experiments/executor.py | 26 ++++++++++++++------------ 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 95a11e32ed..fe899762d6 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -3,7 +3,6 @@ import re import tempfile from concurrent.futures import ProcessPoolExecutor, as_completed -from contextlib import contextmanager from typing import Iterable, Optional from funcy import cached_property @@ -99,13 +98,6 @@ def stash_revs(self): revs[entry.newhexsha] = (i, m.group("baseline_rev")) return revs - @contextmanager - def chdir(self): - cwd = os.getcwd() - os.chdir(self.exp_dvc.root_dir) - yield - os.chdir(cwd) - def _init_clone(self): src_dir = self.repo.scm.root_dir logger.debug("Initializing experiments clone") diff --git a/dvc/repo/experiments/executor.py b/dvc/repo/experiments/executor.py index 67eee6a9b0..494c389f33 100644 --- a/dvc/repo/experiments/executor.py +++ b/dvc/repo/experiments/executor.py @@ -1,7 +1,6 @@ import logging import os import pickle -from contextlib import contextmanager from tempfile import TemporaryDirectory from funcy import cached_property @@ -94,13 +93,6 @@ def dvc(self): def path_info(self): return PathInfo(self.tmp_dir.name) - @contextmanager - def chdir(self): - cwd = os.getcwd() - os.chdir(self.dvc.root_dir) - yield - os.chdir(cwd) - @staticmethod def reproduce(dvc_dir, cwd=None, **kwargs): """Run dvc repro and return the result.""" @@ -114,14 +106,24 @@ def filter_pipeline(stage): unchanged.append(stage) if cwd: + old_cwd = os.getcwd() os.chdir(cwd) else: + old_cwd = None cwd = os.getcwd() - logger.debug("Running repro in '%s'", cwd) - dvc = Repo(dvc_dir) - dvc.checkout() - stages = dvc.reproduce(on_unchanged=filter_pipeline, **kwargs) + try: + logger.debug("Running repro in '%s'", cwd) + dvc = Repo(dvc_dir) + dvc.checkout() + stages = dvc.reproduce(on_unchanged=filter_pipeline, **kwargs) + finally: + if old_cwd is not None: + os.chdir(old_cwd) + + # ideally we would return stages here like a normal repro() call, but + # stages is not currently picklable and cannot be returned across + # multiprocessing calls return hash_exp(stages + unchanged) def cleanup(self):