diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index d788031660..4d1f3b9cc3 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -4,7 +4,6 @@ from dvc.command.base import CmdBase, append_doc_link, fix_subparsers from dvc.exceptions import DvcException - logger = logging.getLogger(__name__) @@ -32,6 +31,31 @@ def _show(self, target, commands, outs, locked): else: logger.info(stage.addressing) + @staticmethod + def _build_output_graph(G, target_stage): + import networkx + from itertools import product + + nodes = {str(out) for out in target_stage.outs} + edges = [] + + for from_stage, to_stage in networkx.edge_dfs(G, target_stage): + from_stage_deps = {dep.path_info.parts for dep in from_stage.deps} + to_outs = { + to_out + for to_out in to_stage.outs + if to_out.path_info.parts in from_stage_deps + } + from_outs = { + from_out + for from_out in from_stage.outs + if str(from_out) in nodes + } + nodes |= {str(to_out) for to_out in to_outs} + for from_out, to_out in product(from_outs, to_outs): + edges.append((str(from_out), str(to_out))) + return nodes, edges + def _build_graph(self, target, commands=False, outs=False): import networkx from dvc import dvcfile @@ -48,10 +72,7 @@ def _build_graph(self, target, commands=False, outs=False): if stage.cmd is None: continue nodes.add(stage.cmd) - elif outs: - for out in stage.outs: - nodes.add(str(out)) - else: + elif not outs: nodes.add(stage.addressing) edges = [] @@ -60,13 +81,12 @@ def _build_graph(self, target, commands=False, outs=False): if to_stage.cmd is None: continue edges.append((from_stage.cmd, to_stage.cmd)) - elif outs: - for from_out in from_stage.outs: - for to_out in to_stage.outs: - edges.append((str(from_out), str(to_out))) - else: + elif not outs: edges.append((from_stage.addressing, to_stage.addressing)) + if outs: + nodes, edges = self._build_output_graph(G, target_stage) + return list(nodes), edges, networkx.is_tree(G) def _show_ascii(self, target, commands, outs): diff --git a/tests/func/test_pipeline.py b/tests/func/test_pipeline.py index eda4a14713..7d39ce30a0 100644 --- a/tests/func/test_pipeline.py +++ b/tests/func/test_pipeline.py @@ -275,7 +275,7 @@ def test_split_pipeline(tmp_dir, scm, dvc): ) command = CmdPipelineShow([]) - nodes, edges, is_tree = command._build_graph( + nodes, edges, _ = command._build_graph( stage.path, commands=False, outs=True ) assert set(nodes) == {"data", "data_train", "data_valid", "result"} @@ -320,11 +320,36 @@ def test_pipeline_ascii_multistage(tmp_dir, dvc, run_copy): run_copy("foo", "bar", name="copy-foo-bar") run_copy("bar", "foobar", single_stage=True) command = CmdPipelineShow([]) - nodes, edges, is_tree = command._build_graph("foobar.dvc") + nodes, edges, _ = command._build_graph("foobar.dvc") assert set(nodes) == {"dvc.yaml:copy-foo-bar", "foobar.dvc"} assert set(edges) == { ("foobar.dvc", "dvc.yaml:copy-foo-bar"), } - nodes, edges, is_tree = command._build_graph("dvc.yaml:copy-foo-bar") + nodes, *_ = command._build_graph("dvc.yaml:copy-foo-bar") assert set(nodes) == {"dvc.yaml:copy-foo-bar"} + + +def test_pipeline_multi_outputs_stages(dvc): + dvc.run( + outs=["alice", "bob"], + cmd="echo alice>alice && echo bob>bob", + single_stage=True, + ) + dvc.run( + deps=["alice"], + outs=["mary", "mike"], + cmd="echo mary>mary && echo mike>mike", + single_stage=True, + ) + stage = dvc.run( + deps=["mary"], + outs=["carol"], + cmd="echo carol>carol", + single_stage=True, + ) + + command = CmdPipelineShow([]) + nodes, edges, _ = command._build_graph(stage.path, outs=True) + assert set(nodes) == {"alice", "mary", "carol"} + assert set(edges) == {("carol", "mary"), ("mary", "alice")}