From 660ee328177c5316ac78133ade0e201e123abec1 Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Thu, 26 Nov 2020 21:20:38 +0800 Subject: [PATCH 1/8] fix #4838 `jobs` option for `dvc import` --- dvc/command/imp.py | 12 ++++++++++++ dvc/repo/imp_url.py | 2 ++ dvc/stage/__init__.py | 5 ++++- dvc/stage/imports.py | 4 ++-- dvc/stage/params.py | 1 + 5 files changed, 21 insertions(+), 3 deletions(-) diff --git a/dvc/command/imp.py b/dvc/command/imp.py index 9528282a09..6b896c59b8 100644 --- a/dvc/command/imp.py +++ b/dvc/command/imp.py @@ -19,6 +19,7 @@ def run(self): rev=self.args.rev, no_exec=self.args.no_exec, desc=self.args.desc, + jobs=self.args.jobs, ) except DvcException: logger.exception( @@ -82,4 +83,15 @@ def add_parser(subparsers, parent_parser): "This doesn't affect any DVC operations." ), ) + import_parser.add_argument( + "-j", + "--jobs", + type=int, + help=( + "Number of jobs to run simultaneously. " + "The default value is 4 * cpu_count(). " + "For SSH remotes, the default is 4. " + ), + metavar="", + ) import_parser.set_defaults(func=CmdImport) diff --git a/dvc/repo/imp_url.py b/dvc/repo/imp_url.py index 974cd8b83c..04239392a9 100644 --- a/dvc/repo/imp_url.py +++ b/dvc/repo/imp_url.py @@ -19,6 +19,7 @@ def imp_url( frozen=True, no_exec=False, desc=None, + jobs=None, ): from dvc.dvcfile import Dvcfile from dvc.stage import Stage, create_stage @@ -42,6 +43,7 @@ def imp_url( deps=[url], outs=[out], erepo=erepo, + jobs=jobs, ) if stage is None: diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 66dca0158c..5f059c28da 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -55,6 +55,7 @@ def loads_from(cls, repo, path, wdir, data): Stage.PARAM_ALWAYS_CHANGED, Stage.PARAM_MD5, Stage.PARAM_DESC, + Stage.PARAM_JOBS, "name", ], ), @@ -121,6 +122,7 @@ def __init__( stage_text=None, dvcfile=None, desc=None, + jobs=None, ): if deps is None: deps = [] @@ -139,6 +141,7 @@ def __init__( self._stage_text = stage_text self._dvcfile = dvcfile self.desc = desc + self.jobs = jobs self.raw_data = RawData() @property @@ -472,7 +475,7 @@ def run( self.remove_outs(ignore_remove=False, force=False) if not self.frozen and self.is_import: - sync_import(self, dry, force) + sync_import(self, dry, force, jobs=self.jobs) elif not self.frozen and self.cmd: run_stage(self, dry, force, **kwargs) else: diff --git a/dvc/stage/imports.py b/dvc/stage/imports.py index 3ca25e2d7e..01792002ac 100644 --- a/dvc/stage/imports.py +++ b/dvc/stage/imports.py @@ -13,7 +13,7 @@ def update_import(stage, rev=None): stage.frozen = frozen -def sync_import(stage, dry=False, force=False): +def sync_import(stage, dry=False, force=False, jobs=None): """Synchronize import's outs to the workspace.""" logger.info( "Importing '{dep}' -> '{out}'".format( @@ -27,4 +27,4 @@ def sync_import(stage, dry=False, force=False): stage.outs[0].checkout() else: stage.save_deps() - stage.deps[0].download(stage.outs[0]) + stage.deps[0].download(stage.outs[0], jobs=jobs) diff --git a/dvc/stage/params.py b/dvc/stage/params.py index 14ca18b250..9446a00b64 100644 --- a/dvc/stage/params.py +++ b/dvc/stage/params.py @@ -12,3 +12,4 @@ class StageParams: PARAM_METRICS = "metrics" PARAM_PLOTS = "plots" PARAM_DESC = "desc" + PARAM_JOBS = "jobs" From 66d17448332378e2fcba3390830567b81af650df Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Thu, 26 Nov 2020 22:25:34 +0800 Subject: [PATCH 2/8] option `jobs` effected --- dvc/dependency/repo.py | 4 ++-- dvc/output/base.py | 4 ++-- dvc/tree/base.py | 21 ++++++++++++++++++--- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/dvc/dependency/repo.py b/dvc/dependency/repo.py index 7eb2f87f0a..558a3a464c 100644 --- a/dvc/dependency/repo.py +++ b/dvc/dependency/repo.py @@ -68,14 +68,14 @@ def save(self): def dumpd(self): return {self.PARAM_PATH: self.def_path, self.PARAM_REPO: self.def_repo} - def download(self, to): + def download(self, to, jobs=None): cache = self.repo.cache.local with self._make_repo(cache_dir=cache.cache_dir) as repo: if self.def_repo.get(self.PARAM_REV_LOCK) is None: self.def_repo[self.PARAM_REV_LOCK] = repo.get_rev() - _, _, cache_infos = repo.fetch_external([self.def_path]) + _, _, cache_infos = repo.fetch_external([self.def_path], jobs=jobs) cache.checkout(to.path_info, cache_infos[0]) diff --git a/dvc/output/base.py b/dvc/output/base.py index 1f4de887b7..b25f424a5c 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -333,8 +333,8 @@ def dumpd(self): def verify_metric(self): raise DvcException(f"verify metric is not supported for {self.scheme}") - def download(self, to): - self.tree.download(self.path_info, to.path_info) + def download(self, to, jobs=None): + self.tree.download(self.path_info, to.path_info, jobs=jobs) def checkout( self, diff --git a/dvc/tree/base.py b/dvc/tree/base.py index 1e76c6d36c..4e6925a6b6 100644 --- a/dvc/tree/base.py +++ b/dvc/tree/base.py @@ -377,6 +377,7 @@ def download( no_progress_bar=False, file_mode=None, dir_mode=None, + jobs=None, ): if not hasattr(self, "_download"): raise RemoteActionNotImplemented("download", self.scheme) @@ -393,14 +394,27 @@ def download( if self.isdir(from_info): return self._download_dir( - from_info, to_info, name, no_progress_bar, file_mode, dir_mode + from_info, + to_info, + name, + no_progress_bar, + file_mode, + dir_mode, + jobs, ) return self._download_file( from_info, to_info, name, no_progress_bar, file_mode, dir_mode ) def _download_dir( - self, from_info, to_info, name, no_progress_bar, file_mode, dir_mode + self, + from_info, + to_info, + name, + no_progress_bar, + file_mode, + dir_mode, + jobs, ): from_infos = list(self.walk_files(from_info)) to_infos = ( @@ -422,7 +436,8 @@ def _download_dir( dir_mode=dir_mode, ) ) - with ThreadPoolExecutor(max_workers=self.jobs) as executor: + max_workers = jobs or self.jobs + with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit(download_files, from_info, to_info) for from_info, to_info in zip(from_infos, to_infos) From 53b431610eb2482ed520c11fc353164e7b2a1470 Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Sun, 29 Nov 2020 09:52:02 +0800 Subject: [PATCH 3/8] Pass the unit tests parse --- tests/unit/command/test_imp.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/unit/command/test_imp.py b/tests/unit/command/test_imp.py index ff04e65032..b9933aa497 100644 --- a/tests/unit/command/test_imp.py +++ b/tests/unit/command/test_imp.py @@ -16,6 +16,8 @@ def test_import(mocker): "version", "--desc", "description", + "--jobs", + "3", ] ) assert cli_args.func == CmdImport @@ -33,6 +35,7 @@ def test_import(mocker): rev="version", no_exec=False, desc="description", + jobs=3, ) @@ -67,4 +70,5 @@ def test_import_no_exec(mocker): rev="version", no_exec=True, desc="description", + jobs=None, ) From 4c2325ca47ecdb026127bedbc87514fc8592b3e5 Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Sun, 29 Nov 2020 12:11:01 +0800 Subject: [PATCH 4/8] Adding a functional test. --- tests/func/test_import.py | 68 +++++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/tests/func/test_import.py b/tests/func/test_import.py index 0983e1bb0b..78321e0c51 100644 --- a/tests/func/test_import.py +++ b/tests/func/test_import.py @@ -33,7 +33,9 @@ def test_import(tmp_dir, scm, dvc, erepo_dir): @pytest.mark.parametrize("src_is_dvc", [True, False]) -def test_import_git_file(tmp_dir, scm, dvc, git_dir, src_is_dvc): +def test_import_git_file( + tmp_dir, scm, dvc, git_dir, src_is_dvc +): # pylint:disable=unused-argument if src_is_dvc: git_dir.init(dvc=True) @@ -49,7 +51,9 @@ def test_import_git_file(tmp_dir, scm, dvc, git_dir, src_is_dvc): } -def test_import_cached_file(erepo_dir, tmp_dir, dvc, scm, monkeypatch): +def test_import_cached_file( + erepo_dir, tmp_dir, dvc, scm, monkeypatch +): # pylint:disable=unused-argument src = "some_file" dst = "some_file_imported" @@ -68,7 +72,9 @@ def test_import_cached_file(erepo_dir, tmp_dir, dvc, scm, monkeypatch): @pytest.mark.parametrize("src_is_dvc", [True, False]) -def test_import_git_dir(tmp_dir, scm, dvc, git_dir, src_is_dvc): +def test_import_git_dir( + tmp_dir, scm, dvc, git_dir, src_is_dvc +): # pylint:disable=unused-argument if src_is_dvc: git_dir.init(dvc=True) @@ -136,7 +142,9 @@ def test_import_file_from_dir(tmp_dir, scm, dvc, erepo_dir): assert (tmp_dir / "X.dvc").exists() -def test_import_file_from_dir_to_dir(tmp_dir, scm, dvc, erepo_dir): +def test_import_file_from_dir_to_dir( + tmp_dir, scm, dvc, erepo_dir +): # pylint:disable=unused-argument with erepo_dir.chdir(): erepo_dir.dvc_gen({"dir": {"foo": "foo"}}, commit="create dir") @@ -158,7 +166,9 @@ def test_import_file_from_dir_to_dir(tmp_dir, scm, dvc, erepo_dir): assert (tmp_dir / "dir" / "foo.dvc").exists() -def test_import_non_cached(erepo_dir, tmp_dir, dvc, scm): +def test_import_non_cached( + erepo_dir, tmp_dir, dvc, scm +): # pylint:disable=unused-argument src = "non_cached_output" dst = src + "_imported" @@ -199,7 +209,9 @@ def test_import_rev(tmp_dir, scm, dvc, erepo_dir): } -def test_pull_imported_stage(tmp_dir, dvc, erepo_dir): +def test_pull_imported_stage( + tmp_dir, dvc, erepo_dir +): # pylint:disable=unused-argument with erepo_dir.chdir(): erepo_dir.dvc_gen("foo", "foo content", commit="create foo") dvc.imp(os.fspath(erepo_dir), "foo", "foo_imported") @@ -248,7 +260,9 @@ def test_pull_imported_directory_stage(tmp_dir, dvc, erepo_dir): assert (tmp_dir / "dir_imported").read_text() == {"foo": "foo content"} -def test_download_error_pulling_imported_stage(tmp_dir, dvc, erepo_dir): +def test_download_error_pulling_imported_stage( + tmp_dir, dvc, erepo_dir +): # pylint:disable=unused-argument with erepo_dir.chdir(): erepo_dir.dvc_gen("foo", "foo content", commit="create foo") dvc.imp(os.fspath(erepo_dir), "foo", "foo_imported") @@ -300,7 +314,9 @@ def test_pull_non_workspace(tmp_dir, scm, dvc, erepo_dir): assert os.path.exists(stage.outs[0].cache_path) -def test_import_non_existing(erepo_dir, tmp_dir, dvc): +def test_import_non_existing( + erepo_dir, tmp_dir, dvc +): # pylint:disable=unused-argument with pytest.raises(PathMissingError): tmp_dir.dvc.imp(os.fspath(erepo_dir), "invalid_output") @@ -330,7 +346,7 @@ def test_pull_no_rev_lock(erepo_dir, tmp_dir, dvc): def test_import_from_bare_git_repo( tmp_dir, make_tmp_dir, erepo_dir, local_cloud -): +): # pylint:disable=unused-argument import git git.Repo.init(os.fspath(tmp_dir), bare=True) @@ -350,7 +366,7 @@ def test_import_from_bare_git_repo( def test_import_pipeline_tracked_outs( tmp_dir, dvc, scm, erepo_dir, run_copy, local_remote -): +): # pylint:disable=unused-argument from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK tmp_dir.gen("foo", "foo") @@ -367,7 +383,7 @@ def test_import_pipeline_tracked_outs( assert (erepo_dir / "baz").read_text() == "foo" -def test_local_import(tmp_dir, dvc, scm): +def test_local_import(tmp_dir, dvc, scm): # pylint:disable=unused-argument tmp_dir.dvc_gen("foo", "foo", commit="init") (tmp_dir / "outdir").mkdir() dvc.imp(".", "foo", out="outdir") @@ -388,7 +404,9 @@ def test_import_mixed_dir(tmp_dir, dvc, erepo_dir): @pytest.mark.parametrize("is_dvc", [True, False]) @pytest.mark.parametrize("files", [{"foo": "foo"}, {"dir": {"bar": "bar"}}]) -def test_import_subrepos(tmp_dir, erepo_dir, dvc, scm, is_dvc, files): +def test_import_subrepos( + tmp_dir, erepo_dir, dvc, scm, is_dvc, files +): # pylint:disable=unused-argument subrepo = erepo_dir / "subrepo" make_subrepo(subrepo, erepo_dir.scm) gen = subrepo.dvc_gen if is_dvc else subrepo.scm_gen @@ -451,7 +469,9 @@ def test_pull_imported_stage_from_subrepos( assert (tmp_dir / "out").read_text() == files[key] -def test_try_import_complete_repo(tmp_dir, dvc, erepo_dir): +def test_try_import_complete_repo( + tmp_dir, dvc, erepo_dir +): # pylint:disable=unused-argument subrepo = erepo_dir / "subrepo" make_subrepo(subrepo, erepo_dir.scm) with subrepo.chdir(): @@ -475,3 +495,25 @@ def test_import_with_no_exec(tmp_dir, dvc, erepo_dir): dst = tmp_dir / "foo_imported" assert not dst.exists() + + +def test_import_with_jobs(mocker, dvc, erepo_dir): + from dvc.data_cloud import DataCloud + + with erepo_dir.chdir(): + erepo_dir.dvc_gen( + { + "dir1": { + "file1": "file1", + "file2": "file2", + "file3": "file3", + "file4": "file4", + }, + }, + commit="init", + ) + + spy = mocker.spy(DataCloud, "pull") + dvc.imp(os.fspath(erepo_dir), "dir1", jobs=3) + run_jobs = tuple(spy.call_args_list[0])[1].get("jobs") + assert run_jobs == 3 From f6fa37b65ae9fc6f9bddbfdaf2046f48465c0223 Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Mon, 30 Nov 2020 22:06:39 +0800 Subject: [PATCH 5/8] Pass `jobs` to `run` and remove some pylint --- dvc/repo/imp_url.py | 3 +-- dvc/stage/__init__.py | 6 ++--- tests/func/test_import.py | 46 +++++++++++---------------------------- 3 files changed, 16 insertions(+), 39 deletions(-) diff --git a/dvc/repo/imp_url.py b/dvc/repo/imp_url.py index 04239392a9..cad9652b4b 100644 --- a/dvc/repo/imp_url.py +++ b/dvc/repo/imp_url.py @@ -43,7 +43,6 @@ def imp_url( deps=[url], outs=[out], erepo=erepo, - jobs=jobs, ) if stage is None: @@ -63,7 +62,7 @@ def imp_url( if no_exec: stage.ignore_outs() else: - stage.run() + stage.run(jobs=jobs) stage.frozen = frozen diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 5f059c28da..6c416702c8 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -55,7 +55,6 @@ def loads_from(cls, repo, path, wdir, data): Stage.PARAM_ALWAYS_CHANGED, Stage.PARAM_MD5, Stage.PARAM_DESC, - Stage.PARAM_JOBS, "name", ], ), @@ -122,7 +121,6 @@ def __init__( stage_text=None, dvcfile=None, desc=None, - jobs=None, ): if deps is None: deps = [] @@ -141,7 +139,6 @@ def __init__( self._stage_text = stage_text self._dvcfile = dvcfile self.desc = desc - self.jobs = jobs self.raw_data = RawData() @property @@ -475,7 +472,8 @@ def run( self.remove_outs(ignore_remove=False, force=False) if not self.frozen and self.is_import: - sync_import(self, dry, force, jobs=self.jobs) + jobs = kwargs.get("jobs", None) + sync_import(self, dry, force, jobs) elif not self.frozen and self.cmd: run_stage(self, dry, force, **kwargs) else: diff --git a/tests/func/test_import.py b/tests/func/test_import.py index 78321e0c51..996bafd1c2 100644 --- a/tests/func/test_import.py +++ b/tests/func/test_import.py @@ -33,9 +33,7 @@ def test_import(tmp_dir, scm, dvc, erepo_dir): @pytest.mark.parametrize("src_is_dvc", [True, False]) -def test_import_git_file( - tmp_dir, scm, dvc, git_dir, src_is_dvc -): # pylint:disable=unused-argument +def test_import_git_file(tmp_dir, scm, dvc, git_dir, src_is_dvc): if src_is_dvc: git_dir.init(dvc=True) @@ -51,9 +49,7 @@ def test_import_git_file( } -def test_import_cached_file( - erepo_dir, tmp_dir, dvc, scm, monkeypatch -): # pylint:disable=unused-argument +def test_import_cached_file(erepo_dir, tmp_dir, dvc, scm, monkeypatch): src = "some_file" dst = "some_file_imported" @@ -72,9 +68,7 @@ def test_import_cached_file( @pytest.mark.parametrize("src_is_dvc", [True, False]) -def test_import_git_dir( - tmp_dir, scm, dvc, git_dir, src_is_dvc -): # pylint:disable=unused-argument +def test_import_git_dir(tmp_dir, scm, dvc, git_dir, src_is_dvc): if src_is_dvc: git_dir.init(dvc=True) @@ -142,9 +136,7 @@ def test_import_file_from_dir(tmp_dir, scm, dvc, erepo_dir): assert (tmp_dir / "X.dvc").exists() -def test_import_file_from_dir_to_dir( - tmp_dir, scm, dvc, erepo_dir -): # pylint:disable=unused-argument +def test_import_file_from_dir_to_dir(tmp_dir, scm, dvc, erepo_dir): with erepo_dir.chdir(): erepo_dir.dvc_gen({"dir": {"foo": "foo"}}, commit="create dir") @@ -166,9 +158,7 @@ def test_import_file_from_dir_to_dir( assert (tmp_dir / "dir" / "foo.dvc").exists() -def test_import_non_cached( - erepo_dir, tmp_dir, dvc, scm -): # pylint:disable=unused-argument +def test_import_non_cached(erepo_dir, tmp_dir, dvc, scm): src = "non_cached_output" dst = src + "_imported" @@ -209,9 +199,7 @@ def test_import_rev(tmp_dir, scm, dvc, erepo_dir): } -def test_pull_imported_stage( - tmp_dir, dvc, erepo_dir -): # pylint:disable=unused-argument +def test_pull_imported_stage(tmp_dir, dvc, erepo_dir): with erepo_dir.chdir(): erepo_dir.dvc_gen("foo", "foo content", commit="create foo") dvc.imp(os.fspath(erepo_dir), "foo", "foo_imported") @@ -260,9 +248,7 @@ def test_pull_imported_directory_stage(tmp_dir, dvc, erepo_dir): assert (tmp_dir / "dir_imported").read_text() == {"foo": "foo content"} -def test_download_error_pulling_imported_stage( - tmp_dir, dvc, erepo_dir -): # pylint:disable=unused-argument +def test_download_error_pulling_imported_stage(tmp_dir, dvc, erepo_dir): with erepo_dir.chdir(): erepo_dir.dvc_gen("foo", "foo content", commit="create foo") dvc.imp(os.fspath(erepo_dir), "foo", "foo_imported") @@ -314,9 +300,7 @@ def test_pull_non_workspace(tmp_dir, scm, dvc, erepo_dir): assert os.path.exists(stage.outs[0].cache_path) -def test_import_non_existing( - erepo_dir, tmp_dir, dvc -): # pylint:disable=unused-argument +def test_import_non_existing(erepo_dir, tmp_dir, dvc): with pytest.raises(PathMissingError): tmp_dir.dvc.imp(os.fspath(erepo_dir), "invalid_output") @@ -346,7 +330,7 @@ def test_pull_no_rev_lock(erepo_dir, tmp_dir, dvc): def test_import_from_bare_git_repo( tmp_dir, make_tmp_dir, erepo_dir, local_cloud -): # pylint:disable=unused-argument +): import git git.Repo.init(os.fspath(tmp_dir), bare=True) @@ -366,7 +350,7 @@ def test_import_from_bare_git_repo( def test_import_pipeline_tracked_outs( tmp_dir, dvc, scm, erepo_dir, run_copy, local_remote -): # pylint:disable=unused-argument +): from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK tmp_dir.gen("foo", "foo") @@ -383,7 +367,7 @@ def test_import_pipeline_tracked_outs( assert (erepo_dir / "baz").read_text() == "foo" -def test_local_import(tmp_dir, dvc, scm): # pylint:disable=unused-argument +def test_local_import(tmp_dir, dvc, scm): tmp_dir.dvc_gen("foo", "foo", commit="init") (tmp_dir / "outdir").mkdir() dvc.imp(".", "foo", out="outdir") @@ -404,9 +388,7 @@ def test_import_mixed_dir(tmp_dir, dvc, erepo_dir): @pytest.mark.parametrize("is_dvc", [True, False]) @pytest.mark.parametrize("files", [{"foo": "foo"}, {"dir": {"bar": "bar"}}]) -def test_import_subrepos( - tmp_dir, erepo_dir, dvc, scm, is_dvc, files -): # pylint:disable=unused-argument +def test_import_subrepos(tmp_dir, erepo_dir, dvc, scm, is_dvc, files): subrepo = erepo_dir / "subrepo" make_subrepo(subrepo, erepo_dir.scm) gen = subrepo.dvc_gen if is_dvc else subrepo.scm_gen @@ -469,9 +451,7 @@ def test_pull_imported_stage_from_subrepos( assert (tmp_dir / "out").read_text() == files[key] -def test_try_import_complete_repo( - tmp_dir, dvc, erepo_dir -): # pylint:disable=unused-argument +def test_try_import_complete_repo(tmp_dir, dvc, erepo_dir): subrepo = erepo_dir / "subrepo" make_subrepo(subrepo, erepo_dir.scm) with subrepo.chdir(): From ae368c17ac1d4861bbeaeffebd3aaaf609d9556e Mon Sep 17 00:00:00 2001 From: Gao Date: Fri, 4 Dec 2020 12:51:22 +0800 Subject: [PATCH 6/8] Update dvc/stage/__init__.py Co-authored-by: Saugat Pachhai --- dvc/stage/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 6c416702c8..ad3af76c28 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -472,7 +472,7 @@ def run( self.remove_outs(ignore_remove=False, force=False) if not self.frozen and self.is_import: - jobs = kwargs.get("jobs", None) + jobs = kwargs.get("jobs") sync_import(self, dry, force, jobs) elif not self.frozen and self.cmd: run_stage(self, dry, force, **kwargs) From 5cca61668bc1c60a714575d2793b175208ca9974 Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Fri, 4 Dec 2020 22:40:43 +0800 Subject: [PATCH 7/8] remove unused param --- dvc/stage/params.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dvc/stage/params.py b/dvc/stage/params.py index 9446a00b64..14ca18b250 100644 --- a/dvc/stage/params.py +++ b/dvc/stage/params.py @@ -12,4 +12,3 @@ class StageParams: PARAM_METRICS = "metrics" PARAM_PLOTS = "plots" PARAM_DESC = "desc" - PARAM_JOBS = "jobs" From 498e94e722956c221a529578d23da7cbb85d68e9 Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Wed, 9 Dec 2020 09:33:04 +0800 Subject: [PATCH 8/8] Solve test fail on windows --- dvc/stage/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index ad3af76c28..6c416702c8 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -472,7 +472,7 @@ def run( self.remove_outs(ignore_remove=False, force=False) if not self.frozen and self.is_import: - jobs = kwargs.get("jobs") + jobs = kwargs.get("jobs", None) sync_import(self, dry, force, jobs) elif not self.frozen and self.cmd: run_stage(self, dry, force, **kwargs)