From 455c6115e1d86764f2b5a7fc25a044f2e4589899 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 20 May 2020 18:48:33 +0545 Subject: [PATCH 1/7] log: streamline/make it readable --- dvc/dvcfile.py | 21 ++++++++++++++++----- dvc/output/base.py | 8 +++----- dvc/repo/reproduce.py | 6 +++++- dvc/repo/run.py | 3 +-- dvc/stage/__init__.py | 29 ++++++++++++++++------------- dvc/stage/run.py | 19 +++++++++++++++---- 6 files changed, 56 insertions(+), 30 deletions(-) diff --git a/dvc/dvcfile.py b/dvc/dvcfile.py index 51e74e1adc..0e7ac0bcb1 100644 --- a/dvc/dvcfile.py +++ b/dvc/dvcfile.py @@ -182,12 +182,12 @@ def dump(self, stage, update_pipeline=False, no_lock=False): assert isinstance(stage, PipelineStage) check_dvc_filename(self.path) - if not no_lock: - self._dump_lockfile(stage) - if update_pipeline and not stage.is_data_source: self._dump_pipeline_file(stage) + if not no_lock: + self._dump_lockfile(stage) + def _dump_lockfile(self, stage): self._lockfile.dump(stage) @@ -197,6 +197,7 @@ def _dump_pipeline_file(self, stage): with open(self.path) as fd: data = parse_stage_for_update(fd.read(), self.path) else: + logger.info("'%s' does not exist, creating…", self.relpath) open(self.path, "w+").close() data["stages"] = data.get("stages", {}) @@ -207,6 +208,9 @@ def _dump_pipeline_file(self, stage): else: data["stages"].update(stage_data) + logger.info( + "Adding stage '%s' to '%s'…", stage.name, self.relpath, + ) dump_stage_file(self.path, data) self.repo.scm.track_file(relpath(self.path)) @@ -250,15 +254,22 @@ def load(self): def dump(self, stage, **kwargs): stage_data = serialize.to_lockfile(stage) if not self.exists(): + modified = True + logger.info("Generating lock file…") data = stage_data open(self.path, "w+").close() else: with self.repo.tree.open(self.path, "r") as fd: data = parse_stage_for_update(fd.read(), self.path) + modified = data.get(stage.name, {}) != stage_data.get( + stage.name, {} + ) + if modified: + logger.info("Updating lock file…") data.update(stage_data) - dump_stage_file(self.path, data) - self.repo.scm.track_file(relpath(self.path)) + if modified: + self.repo.scm.track_file(self.relpath) class Dvcfile: diff --git a/dvc/output/base.py b/dvc/output/base.py index 61fee818a5..8cbae62efe 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -245,17 +245,15 @@ def save(self): if self.metric or self.plot: self.verify_metric() if not self.IS_DEPENDENCY: - logger.info( - "Output '{}' doesn't use cache. Skipping saving.".format( - self - ) + logger.debug( + "Output '%s' doesn't use cache. Skipping saving.", self ) return assert not self.IS_DEPENDENCY if not self.changed(): - logger.info(f"Output '{self}' didn't change. Skipping saving.") + logger.debug("Output '%s' didn't change. Skipping saving.", self) return self.info = self.save_info() diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index 78a302f3be..b474b6a8e6 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -164,7 +164,11 @@ def _reproduce_stages( force_downstream = kwargs.pop("force_downstream", False) result = [] - for stage in pipeline: + for idx, stage in enumerate(pipeline): + if idx != 0: + # Cosmetic newline + logger.info("") + try: ret = _reproduce_stage(stage, **kwargs) diff --git a/dvc/repo/run.py b/dvc/repo/run.py index c17be0555b..b2687746c0 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -82,8 +82,7 @@ def run(self, fname=None, no_exec=False, single_stage=False, **kwargs): raise OutputDuplicationError(exc.output, set(exc.stages) - {stage}) if no_exec: - for out in stage.outs: - out.ignore() + stage.ignore_outs() else: stage.run( no_commit=kwargs.get("no_commit", False), diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index a760084139..e8ec1207aa 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -77,7 +77,7 @@ def create_stage(cls, repo, path, **kwargs): logger.warning("Build cache is ignored when persisting outputs.") if not ignore_run_cache and stage.can_be_skipped: - logger.info("Stage is cached, skipping.") + logger.info("Stage is cached, skipping…") return None return stage @@ -209,10 +209,11 @@ def changed_deps(self): return False if self.is_callback: - logger.warning( - '{stage} is a "callback" stage ' + logger.debug( + '%s is a "callback" stage ' "(has a command and no dependencies) and thus always " - "considered as changed.".format(stage=self) + "considered as changed.", + self, ) return True @@ -248,9 +249,9 @@ def changed_outs(self): return False - def changed_stage(self, warn=False): + def changed_stage(self): changed = self.md5 != self.compute_md5() - if changed and warn: + if changed: logger.debug(self._changed_stage_entry()) return changed @@ -259,14 +260,12 @@ def changed(self): is_changed = ( # Short-circuit order: stage md5 is fast, # deps are expected to change - self.changed_stage(warn=True) + self.changed_stage() or self.changed_deps() or self.changed_outs() ) if is_changed: - logger.info("%s changed.", self) - else: - logger.info("%s didn't change.", self) + logger.debug("%s changed.", self) return is_changed @rwlocked(write=["outs"]) @@ -294,8 +293,8 @@ def remove(self, force=False, remove_outs=True): @rwlocked(read=["deps"], write=["outs"]) def reproduce(self, interactive=False, **kwargs): - if not (kwargs.get("force", False) or self.changed()): + logger.info("Stage '%s' didn't change, skipping…", self.addressing) return None msg = ( @@ -379,6 +378,10 @@ def save_outs(self): for out in self.outs: out.save() + def ignore_outs(self): + for out in self.outs: + out.ignore() + @staticmethod def _changed_entries(entries): return [str(entry) for entry in entries if entry.changed_checksum()] @@ -552,8 +555,8 @@ def _status_stage(self, ret): if self.cmd_changed: ret.append("changed command") - def changed_stage(self, warn=False): - if self.cmd_changed and warn: + def changed_stage(self): + if self.cmd_changed: logger.debug(self._changed_stage_entry()) return self.cmd_changed diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 54b1bfbc79..ebac0ed612 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -79,11 +79,14 @@ def cmd_run(stage, *args, **kwargs): def _is_cached(stage): - return ( + cached = ( not stage.is_callback and not stage.always_changed and stage.already_cached() ) + if cached: + logger.info("Stage '%s' is cached, skipping run…", stage.addressing) + return cached def restored_from_cache(stage): @@ -93,7 +96,12 @@ def restored_from_cache(stage): return False # restore stage from build cache stage_cache.restore(stage) - return stage.outs_cached() + restored = stage.outs_cached() + if restored: + logger.info( + "Restored stage %s from run-cache, skipping…", stage.addressing + ) + return restored def run_stage(stage, dry=False, force=False, run_cache=False): @@ -102,10 +110,13 @@ def run_stage(stage, dry=False, force=False, run_cache=False): run_cache and restored_from_cache(stage) ) if stage_cached: - logger.info("Stage is cached, skipping.") stage.checkout() return - logger.info("Running command:\n\t%s", stage.cmd) + callback_str = "callback " if stage.is_callback else "" + logger.info( + "Running %s" "stage %s with command:", callback_str, stage.addressing, + ) + logger.info("\t%s", stage.cmd) if not dry: cmd_run(stage) From 39ec8c2916a45b7a329803c246d017bd595fad69 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 20 May 2020 18:55:54 +0545 Subject: [PATCH 2/7] Remove 'run' from log message --- dvc/stage/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/stage/run.py b/dvc/stage/run.py index ebac0ed612..1174afc065 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -85,7 +85,7 @@ def _is_cached(stage): and stage.already_cached() ) if cached: - logger.info("Stage '%s' is cached, skipping run…", stage.addressing) + logger.info("Stage '%s' is cached, skipping…", stage.addressing) return cached From e43af6aea7b4664b7a7b494bc78d1054638de025 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 20 May 2020 19:10:54 +0545 Subject: [PATCH 3/7] fix args passing in imports --- dvc/stage/imports.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dvc/stage/imports.py b/dvc/stage/imports.py index 6e7dded379..957e018e36 100644 --- a/dvc/stage/imports.py +++ b/dvc/stage/imports.py @@ -23,11 +23,7 @@ def sync_import(stage, dry=False, force=False): if dry: return - if ( - not force - and not stage.changed_stage(warn=True) - and stage.already_cached() - ): + if not force and not stage.changed_stage() and stage.already_cached(): stage.outs[0].checkout() else: stage.deps[0].download(stage.outs[0]) From 892b8d43c610fcb6a27634d265ed4f2be115edc9 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 20 May 2020 19:46:56 +0545 Subject: [PATCH 4/7] tests: fix test --- tests/unit/stage/test_run.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/unit/stage/test_run.py b/tests/unit/stage/test_run.py index b0595051a1..d5a88460b9 100644 --- a/tests/unit/stage/test_run.py +++ b/tests/unit/stage/test_run.py @@ -6,5 +6,9 @@ def test_run_stage_dry(caplog): with caplog.at_level(level=logging.INFO, logger="dvc"): - run_stage(Stage(None, cmd="mycmd arg1 arg2"), dry=True) - assert caplog.messages == ["Running command:\n\tmycmd arg1 arg2"] + stage = Stage(None, "stage.dvc", cmd="mycmd arg1 arg2") + run_stage(stage, dry=True) + assert caplog.messages == [ + "Running callback stage 'stage.dvc' with command:", + "\t" + "mycmd arg1 arg2", + ] From 72f8d9e16e9141bb5bb5031421b97381ef9c7a02 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 20 May 2020 19:47:24 +0545 Subject: [PATCH 5/7] run: Add quotes around stage name --- dvc/stage/run.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 1174afc065..c2cf17f79c 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -115,7 +115,9 @@ def run_stage(stage, dry=False, force=False, run_cache=False): callback_str = "callback " if stage.is_callback else "" logger.info( - "Running %s" "stage %s with command:", callback_str, stage.addressing, + "Running %s" "stage '%s' with command:", + callback_str, + stage.addressing, ) logger.info("\t%s", stage.cmd) if not dry: From fce08fa49a0a9af045229d90a3c72813a452a4f1 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 20 May 2020 19:58:10 +0545 Subject: [PATCH 6/7] run: add quotes for run-cache restore log --- dvc/stage/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/stage/run.py b/dvc/stage/run.py index c2cf17f79c..a39137ff9b 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -99,7 +99,7 @@ def restored_from_cache(stage): restored = stage.outs_cached() if restored: logger.info( - "Restored stage %s from run-cache, skipping…", stage.addressing + "Restored stage '%s' from run-cache, skipping…", stage.addressing ) return restored From eedd9cd236736b76b2686fdaf1e05ea1fbbd4007 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Thu, 21 May 2020 11:00:43 +0545 Subject: [PATCH 7/7] address @dmitry's comments --- dvc/dvcfile.py | 8 ++++---- dvc/repo/reproduce.py | 7 ++++--- dvc/stage/__init__.py | 4 ++-- dvc/stage/run.py | 7 +++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dvc/dvcfile.py b/dvc/dvcfile.py index 0e7ac0bcb1..43e6cbffb7 100644 --- a/dvc/dvcfile.py +++ b/dvc/dvcfile.py @@ -197,7 +197,7 @@ def _dump_pipeline_file(self, stage): with open(self.path) as fd: data = parse_stage_for_update(fd.read(), self.path) else: - logger.info("'%s' does not exist, creating…", self.relpath) + logger.info("Creating '%s'", self.relpath) open(self.path, "w+").close() data["stages"] = data.get("stages", {}) @@ -209,7 +209,7 @@ def _dump_pipeline_file(self, stage): data["stages"].update(stage_data) logger.info( - "Adding stage '%s' to '%s'…", stage.name, self.relpath, + "Adding stage '%s' to '%s'", stage.name, self.relpath, ) dump_stage_file(self.path, data) self.repo.scm.track_file(relpath(self.path)) @@ -255,7 +255,7 @@ def dump(self, stage, **kwargs): stage_data = serialize.to_lockfile(stage) if not self.exists(): modified = True - logger.info("Generating lock file…") + logger.info("Generating lock file '%s'", self.relpath) data = stage_data open(self.path, "w+").close() else: @@ -265,7 +265,7 @@ def dump(self, stage, **kwargs): stage.name, {} ) if modified: - logger.info("Updating lock file…") + logger.info("Updating lock file '%s'", self.relpath) data.update(stage_data) dump_stage_file(self.path, data) if modified: diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index b474b6a8e6..de36f93c8d 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -164,9 +164,10 @@ def _reproduce_stages( force_downstream = kwargs.pop("force_downstream", False) result = [] - for idx, stage in enumerate(pipeline): - if idx != 0: - # Cosmetic newline + # `ret` is used to add a cosmetic newline. + ret = [] + for stage in pipeline: + if ret: logger.info("") try: diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index e8ec1207aa..8a20381401 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -77,7 +77,7 @@ def create_stage(cls, repo, path, **kwargs): logger.warning("Build cache is ignored when persisting outputs.") if not ignore_run_cache and stage.can_be_skipped: - logger.info("Stage is cached, skipping…") + logger.info("Stage is cached, skipping") return None return stage @@ -294,7 +294,7 @@ def remove(self, force=False, remove_outs=True): @rwlocked(read=["deps"], write=["outs"]) def reproduce(self, interactive=False, **kwargs): if not (kwargs.get("force", False) or self.changed()): - logger.info("Stage '%s' didn't change, skipping…", self.addressing) + logger.info("Stage '%s' didn't change, skipping", self.addressing) return None msg = ( diff --git a/dvc/stage/run.py b/dvc/stage/run.py index a39137ff9b..53bab38d21 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -85,7 +85,7 @@ def _is_cached(stage): and stage.already_cached() ) if cached: - logger.info("Stage '%s' is cached, skipping…", stage.addressing) + logger.info("Stage '%s' is cached", stage.addressing) return cached @@ -98,9 +98,7 @@ def restored_from_cache(stage): stage_cache.restore(stage) restored = stage.outs_cached() if restored: - logger.info( - "Restored stage '%s' from run-cache, skipping…", stage.addressing - ) + logger.info("Restored stage '%s' from run-cache", stage.addressing) return restored @@ -110,6 +108,7 @@ def run_stage(stage, dry=False, force=False, run_cache=False): run_cache and restored_from_cache(stage) ) if stage_cached: + logger.info("Skipping run, checking out outputs") stage.checkout() return