From 438d5362024ec170d509c4af05c276563bb2f5ce Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Wed, 18 Mar 2020 17:56:16 +0545 Subject: [PATCH 1/7] pull: support pulling git-only imports --- dvc/repo/fetch.py | 26 +++++++++++++++-- tests/func/test_data_cloud.py | 54 +++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index ab6e178d8c..4131848300 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -2,9 +2,9 @@ from dvc.cache import NamedCache from dvc.config import NoRemoteError -from dvc.exceptions import DownloadError -from dvc.exceptions import OutputNotFoundError +from dvc.exceptions import DownloadError, OutputNotFoundError from dvc.scm.base import CloneError +from dvc.path_info import PathInfo logger = logging.getLogger(__name__) @@ -74,8 +74,12 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): failed = 0 try: with external_repo(repo_url, repo_rev) as repo: - repo.cache.local.cache_dir = self.cache.local.cache_dir + if not hasattr(repo, "cache"): + return _fetch_external_git( + self.cache.local, repo.root_dir, files + ) + repo.cache.local.cache_dir = self.cache.local.cache_dir with repo.state: cache = NamedCache() for name in files: @@ -101,3 +105,19 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): ) return 0, failed + + +def _fetch_external_git(cache, root_dir, files): + failed, downloaded = 0, 0 + root_dir = PathInfo(root_dir) + for file in files: + info = cache.save_info(root_dir / file) + if info.get(cache.PARAM_CHECKSUM) is None: + failed += 1 + continue + + if cache.changed_cache(info[cache.PARAM_CHECKSUM]): + downloaded += 1 + cache.save(root_dir / file, info) + + return downloaded, failed diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index aaf1a12c5c..34560c5ebb 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -23,6 +23,7 @@ from dvc.remote.base import STATUS_DELETED, STATUS_NEW, STATUS_OK from dvc.utils import file_md5 from dvc.utils.stage import dump_stage_file, load_stage_file +from dvc.external_repo import clean_repos from tests.basic_env import TestDvc from tests.remotes import ( @@ -709,3 +710,56 @@ def test_verify_checksums(tmp_dir, scm, dvc, mocker, tmp_path_factory): dvc.pull() assert checksum_spy.call_count == 3 + + +def test_pull_git_imports(tmp_dir, dvc, scm, git_dir): + with git_dir.chdir(): + git_dir.scm_gen({"dir": {"bar": "bar"}}, commit="second") + git_dir.scm_gen("foo", "foo", commit="first") + + dvc.imp(fspath(git_dir), "foo") + dvc.imp(fspath(git_dir), "dir", out="new_dir", rev="HEAD~") + + assert dvc.pull()["downloaded"] == 0 + + os.remove("foo") + shutil.rmtree("new_dir") + shutil.rmtree(dvc.cache.local.cache_dir) + os.makedirs(dvc.cache.local.cache_dir, exist_ok=True) + clean_repos() + + assert dvc.pull(force=True)["downloaded"] == 2 + + assert os.path.exists("foo") + assert open("foo").read() == "foo" + + assert os.path.isdir("new_dir") + assert open(os.path.join("new_dir", "bar")).read() == "bar" + + +def test_pull_external_dvc_imports(tmp_dir, dvc, scm, erepo_dir): + with erepo_dir.chdir(): + erepo_dir.dvc_gen({"dir": {"bar": "bar"}}, commit="second") + erepo_dir.dvc_gen("foo", "foo", commit="first") + + os.remove("foo") + shutil.rmtree("dir") + + dvc.imp(fspath(erepo_dir), "foo") + dvc.imp(fspath(erepo_dir), "dir", out="new_dir", rev="HEAD~") + + assert dvc.pull()["downloaded"] == 0 + + os.remove("foo") + shutil.rmtree("new_dir") + shutil.rmtree(dvc.cache.local.cache_dir) + os.makedirs(dvc.cache.local.cache_dir, exist_ok=True) + clean_repos() + + assert dvc.pull(force=True)["downloaded"] == 2 + + assert os.path.exists("foo") + assert open("foo").read() == "foo" + + assert os.path.isdir("new_dir") + assert open(os.path.join("new_dir", "bar")).read() == "bar" From f63ed29a0ec0c279651f2a7e6f7fea0bf2bbd1ab Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Mon, 23 Mar 2020 14:23:34 +0545 Subject: [PATCH 2/7] pull: also add git imports from external dvc repo --- dvc/repo/fetch.py | 77 +++++++++++++++++++---------------- tests/func/test_data_cloud.py | 25 ++++++++++++ 2 files changed, 67 insertions(+), 35 deletions(-) diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index 4131848300..46e53b17aa 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -69,55 +69,62 @@ def _fetch( def _fetch_external(self, repo_url, repo_rev, files, jobs): - from dvc.external_repo import external_repo + from dvc.external_repo import external_repo, ExternalRepo - failed = 0 + failed, downloaded = 0, 0 try: with external_repo(repo_url, repo_rev) as repo: - if not hasattr(repo, "cache"): - return _fetch_external_git( - self.cache.local, repo.root_dir, files - ) - - repo.cache.local.cache_dir = self.cache.local.cache_dir - with repo.state: - cache = NamedCache() - for name in files: - try: - out = repo.find_out_by_relpath(name) - except OutputNotFoundError: - failed += 1 - logger.exception( - "failed to fetch data for '{}'".format(name) - ) - continue - else: - cache.update(out.get_used_cache()) - - try: - return repo.cloud.pull(cache, jobs=jobs), failed - except DownloadError as exc: - failed += exc.amount + is_dvc_repo = isinstance(repo, ExternalRepo) + # gather git-only tracked files if dvc repo + git_files = [] if is_dvc_repo else files + if is_dvc_repo: + repo.cache.local.cache_dir = self.cache.local.cache_dir + with repo.state: + cache = NamedCache() + for name in files: + try: + out = repo.find_out_by_relpath(name) + except OutputNotFoundError: + # try to add to cache if they are git-tracked files + git_files.append(name) + else: + cache.update(out.get_used_cache()) + + try: + downloaded += repo.cloud.pull(cache, jobs=jobs) + except DownloadError as exc: + failed += exc.amount + + d, f = _git_to_cache(self.cache.local, repo.root_dir, git_files) + downloaded += d + failed += f except CloneError: failed += 1 logger.exception( "failed to fetch data for '{}'".format(", ".join(files)) ) - return 0, failed + return downloaded, failed -def _fetch_external_git(cache, root_dir, files): - failed, downloaded = 0, 0 - root_dir = PathInfo(root_dir) +def _git_to_cache(cache, repo_root, files): + """Save files from a git repo directly to the cache.""" + failed = set() + num_downloads = 0 + repo_root = PathInfo(repo_root) for file in files: - info = cache.save_info(root_dir / file) + info = cache.save_info(repo_root / file) if info.get(cache.PARAM_CHECKSUM) is None: - failed += 1 + failed.add(file) continue if cache.changed_cache(info[cache.PARAM_CHECKSUM]): - downloaded += 1 - cache.save(root_dir / file, info) + num_downloads += 1 + cache.save(repo_root / file, info) - return downloaded, failed + if failed: + logger.exception( + "failed to fetch data for {}".format(", ".join(failed)) + ) + + return num_downloads, len(failed) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 34560c5ebb..1c537bc5b4 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -763,3 +763,28 @@ def test_pull_external_dvc_imports(tmp_dir, dvc, scm, erepo_dir): assert os.path.isdir("new_dir") assert open(os.path.join("new_dir", "bar")).read() == "bar" + + +def test_pull_git_imports_from_dvc_repo(tmp_dir, dvc, scm, erepo_dir): + with erepo_dir.chdir(): + erepo_dir.scm_gen({"dir": {"bar": "bar"}}, commit="second") + erepo_dir.scm_gen("foo", "foo", commit="first") + + dvc.imp(fspath(erepo_dir), "foo") + dvc.imp(fspath(erepo_dir), "dir", out="new_dir", rev="HEAD~") + + assert dvc.pull()["downloaded"] == 0 + + os.remove("foo") + shutil.rmtree("new_dir") + shutil.rmtree(dvc.cache.local.cache_dir) + os.makedirs(dvc.cache.local.cache_dir, exist_ok=True) + clean_repos() + + assert dvc.pull(force=True)["downloaded"] == 2 + + assert os.path.exists("foo") + assert open("foo").read() == "foo" + + assert os.path.isdir("new_dir") + assert open(os.path.join("new_dir", "bar")).read() == "bar" From 822234245efc7781e4c5b46804ad0e8071b8eecd Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Mon, 23 Mar 2020 14:55:18 +0545 Subject: [PATCH 3/7] fetch: add logging --- dvc/repo/fetch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index 46e53b17aa..c8050579d1 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -119,6 +119,7 @@ def _git_to_cache(cache, repo_root, files): continue if cache.changed_cache(info[cache.PARAM_CHECKSUM]): + logger.debug("fetched '%s' from '%s' repo", file, repo_root) num_downloads += 1 cache.save(repo_root / file, info) From af47c4be32f1f7e900f325ee69cf4479101c5759 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Mon, 23 Mar 2020 15:28:52 +0545 Subject: [PATCH 4/7] pull: don't save link to state When `checkout` or `pull` is done, it'll try to cleanup unused links. So, when we add from erepo to cache, it creates a link, which DVC will try to cleanup. Also, it'll provide unwanted checkout change details. So, skipping those --- dvc/remote/base.py | 16 +++++++++------- dvc/repo/fetch.py | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c12f5df91e..89de458e10 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -468,7 +468,7 @@ def _cache_is_copy(self, path_info): self.cache_type_confirmed = True return self.cache_types[0] == "copy" - def _save_dir(self, path_info, checksum): + def _save_dir(self, path_info, checksum, save_link=True): cache_info = self.checksum_to_path_info(checksum) dir_info = self.get_dir_cache(checksum) @@ -479,7 +479,9 @@ def _save_dir(self, path_info, checksum): entry_checksum = entry[self.PARAM_CHECKSUM] self._save_file(entry_info, entry_checksum, save_link=False) - self.state.save_link(path_info) + if save_link: + self.state.save_link(path_info) + self.state.save(cache_info, checksum) self.state.save(path_info, checksum) @@ -510,7 +512,7 @@ def walk_files(self, path_info): def protect(path_info): pass - def save(self, path_info, checksum_info): + def save(self, path_info, checksum_info, save_link=True): if path_info.scheme != self.scheme: raise RemoteActionNotImplemented( "save {} -> {}".format(path_info.scheme, self.scheme), @@ -518,15 +520,15 @@ def save(self, path_info, checksum_info): ) checksum = checksum_info[self.PARAM_CHECKSUM] - self._save(path_info, checksum) + self._save(path_info, checksum, save_link) - def _save(self, path_info, checksum): + def _save(self, path_info, checksum, save_link=True): to_info = self.checksum_to_path_info(checksum) logger.debug("Saving '%s' to '%s'.", path_info, to_info) if self.isdir(path_info): - self._save_dir(path_info, checksum) + self._save_dir(path_info, checksum, save_link) return - self._save_file(path_info, checksum) + self._save_file(path_info, checksum, save_link) def _handle_transfer_exception( self, from_info, to_info, exception, operation diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index c8050579d1..fbbaab1be8 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -121,7 +121,7 @@ def _git_to_cache(cache, repo_root, files): if cache.changed_cache(info[cache.PARAM_CHECKSUM]): logger.debug("fetched '%s' from '%s' repo", file, repo_root) num_downloads += 1 - cache.save(repo_root / file, info) + cache.save(repo_root / file, info, save_link=False) if failed: logger.exception( From d664394f11ecc34ff52cb32ba82739e124e0c471 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Mon, 23 Mar 2020 16:45:58 +0545 Subject: [PATCH 5/7] tests: refactor/rewrite --- tests/func/test_data_cloud.py | 50 +++++++++-------------------------- 1 file changed, 13 insertions(+), 37 deletions(-) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 1c537bc5b4..f6609384bc 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -40,6 +40,7 @@ TEST_GCP_CREDS_FILE, TEST_REMOTE, ) +from dvc.utils.fs import remove class TestDataCloud(TestDvc): @@ -712,19 +713,20 @@ def test_verify_checksums(tmp_dir, scm, dvc, mocker, tmp_path_factory): assert checksum_spy.call_count == 3 -def test_pull_git_imports(tmp_dir, dvc, scm, git_dir): - with git_dir.chdir(): - git_dir.scm_gen({"dir": {"bar": "bar"}}, commit="second") - git_dir.scm_gen("foo", "foo", commit="first") +@pytest.mark.parametrize("erepo", ["git_dir", "erepo_dir"]) +def test_pull_git_imports(request, tmp_dir, dvc, scm, erepo): + erepo = request.getfixturevalue(erepo) + with erepo.chdir(): + erepo.scm_gen({"dir": {"bar": "bar"}}, commit="second") + erepo.scm_gen("foo", "foo", commit="first") - dvc.imp(fspath(git_dir), "foo") - dvc.imp(fspath(git_dir), "dir", out="new_dir", rev="HEAD~") + dvc.imp(fspath(erepo), "foo") + dvc.imp(fspath(erepo), "dir", out="new_dir", rev="HEAD~") assert dvc.pull()["downloaded"] == 0 - os.remove("foo") - shutil.rmtree("new_dir") - shutil.rmtree(dvc.cache.local.cache_dir) + for item in ["foo", "new_dir", dvc.cache.local.cache_dir]: + remove(item) os.makedirs(dvc.cache.local.cache_dir, exist_ok=True) clean_repos() @@ -750,34 +752,8 @@ def test_pull_external_dvc_imports(tmp_dir, dvc, scm, erepo_dir): assert dvc.pull()["downloaded"] == 0 - os.remove("foo") - shutil.rmtree("new_dir") - shutil.rmtree(dvc.cache.local.cache_dir) - os.makedirs(dvc.cache.local.cache_dir, exist_ok=True) - clean_repos() - - assert dvc.pull(force=True)["downloaded"] == 2 - - assert os.path.exists("foo") - assert open("foo").read() == "foo" - - assert os.path.isdir("new_dir") - assert open(os.path.join("new_dir", "bar")).read() == "bar" - - -def test_pull_git_imports_from_dvc_repo(tmp_dir, dvc, scm, erepo_dir): - with erepo_dir.chdir(): - erepo_dir.scm_gen({"dir": {"bar": "bar"}}, commit="second") - erepo_dir.scm_gen("foo", "foo", commit="first") - - dvc.imp(fspath(erepo_dir), "foo") - dvc.imp(fspath(erepo_dir), "dir", out="new_dir", rev="HEAD~") - - assert dvc.pull()["downloaded"] == 0 - - os.remove("foo") - shutil.rmtree("new_dir") - shutil.rmtree(dvc.cache.local.cache_dir) + for item in ["foo", "new_dir", dvc.cache.local.cache_dir]: + remove(item) os.makedirs(dvc.cache.local.cache_dir, exist_ok=True) clean_repos() From 2fbe8ce6b47ffec370c4a9695a8dc6b0f973e86a Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Mon, 23 Mar 2020 17:11:45 +0545 Subject: [PATCH 6/7] flake8: why it's complaining? --- tests/func/test_data_cloud.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index f6609384bc..b87907503f 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -22,6 +22,7 @@ from dvc.remote import RemoteSSH from dvc.remote.base import STATUS_DELETED, STATUS_NEW, STATUS_OK from dvc.utils import file_md5 +from dvc.utils.fs import remove from dvc.utils.stage import dump_stage_file, load_stage_file from dvc.external_repo import clean_repos from tests.basic_env import TestDvc @@ -40,7 +41,6 @@ TEST_GCP_CREDS_FILE, TEST_REMOTE, ) -from dvc.utils.fs import remove class TestDataCloud(TestDvc): From ba616f2528e0828d5bb0888a1ae660558ecf852e Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Tue, 24 Mar 2020 16:34:47 +0545 Subject: [PATCH 7/7] tests: do not leak file descriptors --- tests/func/test_data_cloud.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index b87907503f..8f4c6574b3 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -732,11 +732,11 @@ def test_pull_git_imports(request, tmp_dir, dvc, scm, erepo): assert dvc.pull(force=True)["downloaded"] == 2 - assert os.path.exists("foo") - assert open("foo").read() == "foo" + assert (tmp_dir / "foo").exists() + assert (tmp_dir / "foo").read_text() == "foo" - assert os.path.isdir("new_dir") - assert open(os.path.join("new_dir", "bar")).read() == "bar" + assert (tmp_dir / "new_dir").exists() + assert (tmp_dir / "new_dir" / "bar").read_text() == "bar" def test_pull_external_dvc_imports(tmp_dir, dvc, scm, erepo_dir): @@ -759,8 +759,8 @@ def test_pull_external_dvc_imports(tmp_dir, dvc, scm, erepo_dir): assert dvc.pull(force=True)["downloaded"] == 2 - assert os.path.exists("foo") - assert open("foo").read() == "foo" + assert (tmp_dir / "foo").exists() + assert (tmp_dir / "foo").read_text() == "foo" - assert os.path.isdir("new_dir") - assert open(os.path.join("new_dir", "bar")).read() == "bar" + assert (tmp_dir / "new_dir").exists() + assert (tmp_dir / "new_dir" / "bar").read_text() == "bar"