Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions dvc/dvcfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ def dump(self, stage, update_pipeline=False, no_lock=False):
assert isinstance(stage, PipelineStage)
check_dvc_filename(self.path)

if not no_lock:
self._dump_lockfile(stage)

if update_pipeline and not stage.is_data_source:
self._dump_pipeline_file(stage)

if not no_lock:
self._dump_lockfile(stage)

def _dump_lockfile(self, stage):
self._lockfile.dump(stage)

Expand All @@ -197,6 +197,7 @@ def _dump_pipeline_file(self, stage):
with open(self.path) as fd:
data = parse_stage_for_update(fd.read(), self.path)
else:
logger.info("Creating '%s'", self.relpath)
open(self.path, "w+").close()

data["stages"] = data.get("stages", {})
Expand All @@ -207,6 +208,9 @@ def _dump_pipeline_file(self, stage):
else:
data["stages"].update(stage_data)

logger.info(
"Adding stage '%s' to '%s'", stage.name, self.relpath,
)
dump_stage_file(self.path, data)
self.repo.scm.track_file(relpath(self.path))

Expand Down Expand Up @@ -250,15 +254,22 @@ def load(self):
def dump(self, stage, **kwargs):
stage_data = serialize.to_lockfile(stage)
if not self.exists():
modified = True
logger.info("Generating lock file '%s'", self.relpath)
data = stage_data
open(self.path, "w+").close()
else:
with self.repo.tree.open(self.path, "r") as fd:
data = parse_stage_for_update(fd.read(), self.path)
modified = data.get(stage.name, {}) != stage_data.get(
stage.name, {}
)
if modified:
logger.info("Updating lock file '%s'", self.relpath)
data.update(stage_data)

dump_stage_file(self.path, data)
self.repo.scm.track_file(relpath(self.path))
if modified:
self.repo.scm.track_file(self.relpath)


class Dvcfile:
Expand Down
8 changes: 3 additions & 5 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,15 @@ def save(self):
if self.metric or self.plot:
self.verify_metric()
if not self.IS_DEPENDENCY:
logger.info(
"Output '{}' doesn't use cache. Skipping saving.".format(
self
)
logger.debug(
"Output '%s' doesn't use cache. Skipping saving.", self
)
return

assert not self.IS_DEPENDENCY

if not self.changed():
logger.info(f"Output '{self}' didn't change. Skipping saving.")
logger.debug("Output '%s' didn't change. Skipping saving.", self)
return

self.info = self.save_info()
Expand Down
5 changes: 5 additions & 0 deletions dvc/repo/reproduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,12 @@ def _reproduce_stages(

force_downstream = kwargs.pop("force_downstream", False)
result = []
# `ret` is used to add a cosmetic newline.
ret = []
for stage in pipeline:
if ret:
logger.info("")

try:
ret = _reproduce_stage(stage, **kwargs)

Expand Down
3 changes: 1 addition & 2 deletions dvc/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ def run(self, fname=None, no_exec=False, single_stage=False, **kwargs):
raise OutputDuplicationError(exc.output, set(exc.stages) - {stage})

if no_exec:
for out in stage.outs:
out.ignore()
stage.ignore_outs()
else:
stage.run(
no_commit=kwargs.get("no_commit", False),
Expand Down
29 changes: 16 additions & 13 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def create_stage(cls, repo, path, **kwargs):
logger.warning("Build cache is ignored when persisting outputs.")

if not ignore_run_cache and stage.can_be_skipped:
logger.info("Stage is cached, skipping.")
logger.info("Stage is cached, skipping")
return None

return stage
Expand Down Expand Up @@ -209,10 +209,11 @@ def changed_deps(self):
return False

if self.is_callback:
logger.warning(
'{stage} is a "callback" stage '
logger.debug(
'%s is a "callback" stage '
"(has a command and no dependencies) and thus always "
"considered as changed.".format(stage=self)
"considered as changed.",
self,
)
return True

Expand Down Expand Up @@ -248,9 +249,9 @@ def changed_outs(self):

return False

def changed_stage(self, warn=False):
def changed_stage(self):
changed = self.md5 != self.compute_md5()
if changed and warn:
if changed:
logger.debug(self._changed_stage_entry())
return changed

Expand All @@ -259,14 +260,12 @@ def changed(self):
is_changed = (
# Short-circuit order: stage md5 is fast,
# deps are expected to change
self.changed_stage(warn=True)
self.changed_stage()
or self.changed_deps()
or self.changed_outs()
)
if is_changed:
logger.info("%s changed.", self)
else:
logger.info("%s didn't change.", self)
logger.debug("%s changed.", self)
return is_changed

@rwlocked(write=["outs"])
Expand Down Expand Up @@ -294,8 +293,8 @@ def remove(self, force=False, remove_outs=True):

@rwlocked(read=["deps"], write=["outs"])
def reproduce(self, interactive=False, **kwargs):

if not (kwargs.get("force", False) or self.changed()):
logger.info("Stage '%s' didn't change, skipping", self.addressing)
return None

msg = (
Expand Down Expand Up @@ -379,6 +378,10 @@ def save_outs(self):
for out in self.outs:
out.save()

def ignore_outs(self):
for out in self.outs:
out.ignore()

@staticmethod
def _changed_entries(entries):
return [str(entry) for entry in entries if entry.changed_checksum()]
Expand Down Expand Up @@ -552,8 +555,8 @@ def _status_stage(self, ret):
if self.cmd_changed:
ret.append("changed command")

def changed_stage(self, warn=False):
if self.cmd_changed and warn:
def changed_stage(self):
if self.cmd_changed:
logger.debug(self._changed_stage_entry())
return self.cmd_changed

Expand Down
6 changes: 1 addition & 5 deletions dvc/stage/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ def sync_import(stage, dry=False, force=False):
if dry:
return

if (
not force
and not stage.changed_stage(warn=True)
and stage.already_cached()
):
if not force and not stage.changed_stage() and stage.already_cached():
stage.outs[0].checkout()
else:
stage.deps[0].download(stage.outs[0])
20 changes: 16 additions & 4 deletions dvc/stage/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ def cmd_run(stage, *args, **kwargs):


def _is_cached(stage):
return (
cached = (
not stage.is_callback
and not stage.always_changed
and stage.already_cached()
)
if cached:
logger.info("Stage '%s' is cached", stage.addressing)
return cached


def restored_from_cache(stage):
Expand All @@ -93,7 +96,10 @@ def restored_from_cache(stage):
return False
# restore stage from build cache
stage_cache.restore(stage)
return stage.outs_cached()
restored = stage.outs_cached()
if restored:
logger.info("Restored stage '%s' from run-cache", stage.addressing)
return restored


def run_stage(stage, dry=False, force=False, run_cache=False):
Expand All @@ -102,10 +108,16 @@ def run_stage(stage, dry=False, force=False, run_cache=False):
run_cache and restored_from_cache(stage)
)
if stage_cached:
logger.info("Stage is cached, skipping.")
logger.info("Skipping run, checking out outputs")
stage.checkout()
return

logger.info("Running command:\n\t%s", stage.cmd)
callback_str = "callback " if stage.is_callback else ""
logger.info(
"Running %s" "stage '%s' with command:",
callback_str,
stage.addressing,
)
logger.info("\t%s", stage.cmd)
if not dry:
cmd_run(stage)
8 changes: 6 additions & 2 deletions tests/unit/stage/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@

def test_run_stage_dry(caplog):
with caplog.at_level(level=logging.INFO, logger="dvc"):
run_stage(Stage(None, cmd="mycmd arg1 arg2"), dry=True)
assert caplog.messages == ["Running command:\n\tmycmd arg1 arg2"]
stage = Stage(None, "stage.dvc", cmd="mycmd arg1 arg2")
run_stage(stage, dry=True)
assert caplog.messages == [
"Running callback stage 'stage.dvc' with command:",
"\t" + "mycmd arg1 arg2",
]