From 1cd02150abda694bada69af0ce2719abf76fd3ec Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Thu, 7 Apr 2022 19:42:39 +0300 Subject: [PATCH 1/8] repofs: use fs.walk directly --- dvc/fs/repo.py | 68 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/dvc/fs/repo.py b/dvc/fs/repo.py index 822104dcdf..c8c71a5178 100644 --- a/dvc/fs/repo.py +++ b/dvc/fs/repo.py @@ -351,19 +351,29 @@ def _subrepo_walk(self, dir_path, **kwargs): ignore_subrepos is set to False. """ fs, dvc_fs, dvc_path = self._get_fs_pair(dir_path) - fs_walk = fs.walk(dir_path, topdown=True) - yield from self._walk(fs_walk, dvc_fs, dvc_path, **kwargs) + yield from self._walk(fs, dir_path, dvc_fs, dvc_path, **kwargs) - def _walk(self, repo_walk, dvc_fs, dvc_path, dvcfiles=False): + def _walk( + self, + fs, + fs_path, + dvc_fs, + dvc_path, + dvcfiles=False, + dvcignore=None, + **kwargs, + ): from dvc.dvcfile import is_valid_filename from dvc.ignore import DvcIgnore - assert repo_walk + assert dvcignore dvc_dirs, dvc_fnames = _ls(dvc_fs, dvc_path) if dvc_fs else ([], []) try: - repo_root, repo_dirs, repo_fnames = next(repo_walk) + _, repo_dirs, repo_fnames = next( + dvcignore.walk(fs, fs_path, **kwargs) + ) except StopIteration: return @@ -386,10 +396,10 @@ def _func(fname): # merge file lists files = set(filter(_func, dvc_fnames + repo_fnames)) - yield repo_root, dirs, list(files) + yield fs_path, dirs, list(files) def is_dvc_repo(d): - return self._is_dvc_repo(os.path.join(repo_root, d)) + return self._is_dvc_repo(os.path.join(fs_path, d)) # remove subrepos to prevent it from being traversed subrepos = set(filter(is_dvc_repo, repo_only)) @@ -400,21 +410,34 @@ def is_dvc_repo(d): for dirname in dirs: if dirname in subrepos: - dir_path = os.path.join(repo_root, dirname) - yield from self._subrepo_walk(dir_path, dvcfiles=dvcfiles) + dir_path = os.path.join(fs_path, dirname) + yield from self._subrepo_walk( + dir_path, dvcfiles=dvcfiles, dvcignore=dvcignore, **kwargs + ) elif dirname in shared: yield from self._walk( - repo_walk, + fs, + fs.path.join(fs_path, dirname), dvc_fs, dvc_fs.path.join(dvc_path, dirname), dvcfiles=dvcfiles, + dvcignore=dvcignore, + **kwargs, ) elif dirname in dvc_set: yield from _wrap_walk( dvc_fs, dvc_fs.path.join(dvc_path, dirname) ) elif dirname in repo_set: - yield from self._walk(repo_walk, None, None, dvcfiles=dvcfiles) + yield from self._walk( + fs, + fs.path.join(fs_path, dirname), + None, + None, + dvcfiles=dvcfiles, + dvcignore=dvcignore, + **kwargs, + ) def walk(self, top, topdown=True, **kwargs): """Walk and merge both DVC and repo fss. @@ -437,21 +460,36 @@ def walk(self, top, topdown=True, **kwargs): return repo = self._get_repo(os.path.abspath(top)) + dvcignore = repo.dvcignore dvcfiles = kwargs.pop("dvcfiles", False) fs, dvc_fs, dvc_path = self._get_fs_pair(top) repo_exists = fs.exists(top) - repo_walk = repo.dvcignore.walk(fs, top, topdown=topdown, **kwargs) - if not dvc_fs or (repo_exists and dvc_fs.isdvc(dvc_path)): - yield from self._walk(repo_walk, None, None, dvcfiles=dvcfiles) + yield from self._walk( + fs, + top, + None, + None, + dvcfiles=dvcfiles, + dvcignore=dvcignore, + **kwargs, + ) return if not repo_exists: yield from _wrap_walk(dvc_fs, dvc_path, topdown=topdown, **kwargs) - yield from self._walk(repo_walk, dvc_fs, dvc_path, dvcfiles=dvcfiles) + yield from self._walk( + fs, + top, + dvc_fs, + dvc_path, + dvcfiles=dvcfiles, + dvcignore=dvcignore, + **kwargs, + ) def find(self, path, prefix=None): for root, _, files in self.walk(path): From 4a6407bce0ea05a0522f10dd4b3da52583a1d31e Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Thu, 7 Apr 2022 21:51:11 +0300 Subject: [PATCH 2/8] repofs: don't assume repofs paths are the same as underlying fs paths --- dvc/fs/repo.py | 60 +++++++++++++++++--------------------- tests/unit/fs/test_repo.py | 38 ++++++++++++------------ 2 files changed, 46 insertions(+), 52 deletions(-) diff --git a/dvc/fs/repo.py b/dvc/fs/repo.py index c8c71a5178..c8de8c32b0 100644 --- a/dvc/fs/repo.py +++ b/dvc/fs/repo.py @@ -233,7 +233,7 @@ def _get_fs_pair( else: dvc_path = path - return repo.fs, dvc_fs, dvc_path + return repo.fs, path, dvc_fs, dvc_path def open( self, path, mode="r", encoding="utf-8", **kwargs @@ -241,9 +241,9 @@ def open( if "b" in mode: encoding = None - fs, dvc_fs, dvc_path = self._get_fs_pair(path) + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) try: - return fs.open(path, mode=mode, encoding=encoding) + return fs.open(fs_path, mode=mode, encoding=encoding) except FileNotFoundError: if not dvc_fs: raise @@ -251,12 +251,10 @@ def open( return dvc_fs.open(dvc_path, mode=mode, encoding=encoding, **kwargs) def exists(self, path) -> bool: - path = os.path.abspath(path) - - fs, dvc_fs, dvc_path = self._get_fs_pair(path) + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) if not dvc_fs: - return fs.exists(path) + return fs.exists(fs_path) if dvc_fs.repo.dvcignore.is_ignored(fs, path): return False @@ -267,7 +265,7 @@ def exists(self, path) -> bool: if not dvc_fs.exists(dvc_path): return False - for p in self.path.parents(path): + for p in self.path.parents(fs_path): try: if fs.info(p)["type"] != "directory": return False @@ -277,15 +275,13 @@ def exists(self, path) -> bool: return True def isdir(self, path): # pylint: disable=arguments-renamed - path = os.path.abspath(path) + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - fs, dvc_fs, dvc_path = self._get_fs_pair(path) - - if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_dir(path): + if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_dir(fs_path): return False try: - info = fs.info(path) + info = fs.info(fs_path) return info["type"] == "directory" except (OSError, ValueError): # from CPython's os.path.isdir() @@ -299,7 +295,7 @@ def isdir(self, path): # pylint: disable=arguments-renamed except FileNotFoundError: return False - for p in self.path.parents(path): + for p in fs.path.parents(fs_path): try: if fs.info(p)["type"] != "directory": return False @@ -309,19 +305,17 @@ def isdir(self, path): # pylint: disable=arguments-renamed return info["type"] == "directory" def isdvc(self, path, **kwargs): - _, dvc_fs, dvc_path = self._get_fs_pair(path) + _, _, dvc_fs, dvc_path = self._get_fs_pair(path) return dvc_fs is not None and dvc_fs.isdvc(dvc_path, **kwargs) def isfile(self, path): # pylint: disable=arguments-renamed - path = os.path.abspath(path) - - fs, dvc_fs, dvc_path = self._get_fs_pair(path) + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_file(path): + if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_file(fs_path): return False try: - info = fs.info(path) + info = fs.info(fs_path) return info["type"] == "file" except (OSError, ValueError): # from CPython's os.path.isfile() @@ -335,7 +329,7 @@ def isfile(self, path): # pylint: disable=arguments-renamed except FileNotFoundError: return False - for p in self.path.parents(path): + for p in fs.path.parents(fs_path): try: if fs.info(p)["type"] != "directory": return False @@ -350,8 +344,8 @@ def _subrepo_walk(self, dir_path, **kwargs): NOTE: subrepo will only be discovered when walking if ignore_subrepos is set to False. """ - fs, dvc_fs, dvc_path = self._get_fs_pair(dir_path) - yield from self._walk(fs, dir_path, dvc_fs, dvc_path, **kwargs) + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(dir_path) + yield from self._walk(fs, fs_path, dvc_fs, dvc_path, **kwargs) def _walk( self, @@ -463,13 +457,13 @@ def walk(self, top, topdown=True, **kwargs): dvcignore = repo.dvcignore dvcfiles = kwargs.pop("dvcfiles", False) - fs, dvc_fs, dvc_path = self._get_fs_pair(top) - repo_exists = fs.exists(top) + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(top) + repo_exists = fs.exists(fs_path) if not dvc_fs or (repo_exists and dvc_fs.isdvc(dvc_path)): yield from self._walk( fs, - top, + fs_path, None, None, dvcfiles=dvcfiles, @@ -483,7 +477,7 @@ def walk(self, top, topdown=True, **kwargs): yield from self._walk( fs, - top, + fs_path, dvc_fs, dvc_path, dvcfiles=dvcfiles, @@ -499,10 +493,10 @@ def find(self, path, prefix=None): def get_file( self, from_info, to_file, callback=DEFAULT_CALLBACK, **kwargs ): - fs, dvc_fs, dvc_path = self._get_fs_pair(from_info) + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(from_info) try: fs.get_file( # pylint: disable=protected-access - from_info, to_file, callback=callback, **kwargs + fs_path, to_file, callback=callback, **kwargs ) return except FileNotFoundError: @@ -514,7 +508,7 @@ def get_file( ) def info(self, path): - fs, dvc_fs, dvc_path = self._get_fs_pair(path) + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) try: dvc_info = dvc_fs.info(dvc_path) @@ -524,7 +518,7 @@ def info(self, path): try: from dvc.utils import is_exec - fs_info = fs.info(path) + fs_info = fs.info(fs_path) fs_info["repo"] = dvc_fs.repo fs_info["isout"] = ( dvc_info.get("isout", False) if dvc_info else False @@ -550,9 +544,9 @@ def info(self, path): return dvc_info def checksum(self, path): - fs, dvc_fs, dvc_path = self._get_fs_pair(path) + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) try: - return fs.checksum(path) + return fs.checksum(fs_path) except FileNotFoundError: return dvc_fs.checksum(dvc_path) diff --git a/tests/unit/fs/test_repo.py b/tests/unit/fs/test_repo.py index 24b241fcfb..cdf22e0aea 100644 --- a/tests/unit/fs/test_repo.py +++ b/tests/unit/fs/test_repo.py @@ -194,13 +194,13 @@ def test_walk(tmp_dir, dvc, dvcfiles, extra_expected): fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join("dir", "subdir1"), - os.path.join("dir", "subdir2"), - os.path.join("dir", "subdir1", "foo1"), - os.path.join("dir", "subdir1", "bar1"), - os.path.join("dir", "subdir2", "foo2"), - os.path.join("dir", "foo"), - os.path.join("dir", "bar"), + os.path.join(tmp_dir, "dir", "subdir1"), + os.path.join(tmp_dir, "dir", "subdir2"), + os.path.join(tmp_dir, "dir", "subdir1", "foo1"), + os.path.join(tmp_dir, "dir", "subdir1", "bar1"), + os.path.join(tmp_dir, "dir", "subdir2", "foo2"), + os.path.join(tmp_dir, "dir", "foo"), + os.path.join(tmp_dir, "dir", "bar"), ] actual = [] @@ -208,7 +208,7 @@ def test_walk(tmp_dir, dvc, dvcfiles, extra_expected): for entry in dirs + files: actual.append(os.path.join(root, entry)) - expected += extra_expected + expected += [os.path.join(tmp_dir, path) for path in extra_expected] assert set(actual) == set(expected) assert len(actual) == len(expected) @@ -228,14 +228,14 @@ def test_walk_dirty(tmp_dir, dvc): fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join("dir", "subdir1"), - os.path.join("dir", "subdir2"), - os.path.join("dir", "subdir3"), - os.path.join("dir", "subdir1", "foo1"), - os.path.join("dir", "subdir1", "bar1"), - os.path.join("dir", "subdir2", "foo2"), - os.path.join("dir", "subdir3", "foo3"), - os.path.join("dir", "bar"), + os.path.join(tmp_dir, "dir", "subdir1"), + os.path.join(tmp_dir, "dir", "subdir2"), + os.path.join(tmp_dir, "dir", "subdir3"), + os.path.join(tmp_dir, "dir", "subdir1", "foo1"), + os.path.join(tmp_dir, "dir", "subdir1", "bar1"), + os.path.join(tmp_dir, "dir", "subdir2", "foo2"), + os.path.join(tmp_dir, "dir", "subdir3", "foo3"), + os.path.join(tmp_dir, "dir", "bar"), ] actual = [] @@ -278,9 +278,9 @@ def test_walk_mixed_dir(tmp_dir, scm, dvc): fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join("dir", "foo"), - os.path.join("dir", "bar"), - os.path.join("dir", ".gitignore"), + os.path.join(tmp_dir, "dir", "foo"), + os.path.join(tmp_dir, "dir", "bar"), + os.path.join(tmp_dir, "dir", ".gitignore"), ] actual = [] for root, dirs, files in fs.walk("dir"): From 07595ac2400327badc2f74934fa9b810aa1691d6 Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Fri, 8 Apr 2022 19:33:26 +0300 Subject: [PATCH 3/8] repofs: use fs.ls --- dvc/fs/git.py | 3 +++ dvc/fs/local.py | 3 +++ dvc/fs/repo.py | 28 ++++++++++++++++------------ 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/dvc/fs/git.py b/dvc/fs/git.py index dd830ea144..13c43d5547 100644 --- a/dvc/fs/git.py +++ b/dvc/fs/git.py @@ -50,3 +50,6 @@ def fs(self) -> "FsspecGitFileSystem": @property def rev(self) -> str: return self.fs.rev + + def ls(self, path, **kwargs): + return self.fs.ls(path, **kwargs) or [] diff --git a/dvc/fs/local.py b/dvc/fs/local.py index 8bbfa2123d..1595318006 100644 --- a/dvc/fs/local.py +++ b/dvc/fs/local.py @@ -147,6 +147,9 @@ def reflink(self, from_info, to_info): def info(self, path): return self.fs.info(path) + def ls(self, path, **kwargs): + return self.fs.ls(path, **kwargs) + def put_file( self, from_file, to_info, callback=DEFAULT_CALLBACK, **kwargs ): diff --git a/dvc/fs/repo.py b/dvc/fs/repo.py index c8de8c32b0..a15db1c98f 100644 --- a/dvc/fs/repo.py +++ b/dvc/fs/repo.py @@ -28,13 +28,12 @@ def _ls(fs, path): dnames = [] fnames = [] - with suppress(FileNotFoundError): - for entry in fs.ls(path, detail=True): - name = fs.path.name(entry["name"]) - if entry["type"] == "directory": - dnames.append(name) - else: - fnames.append(name) + for entry in fs.ls(path, detail=True): + name = fs.path.name(entry["name"]) + if entry["type"] == "directory": + dnames.append(name) + else: + fnames.append(name) return dnames, fnames @@ -362,15 +361,20 @@ def _walk( assert dvcignore - dvc_dirs, dvc_fnames = _ls(dvc_fs, dvc_path) if dvc_fs else ([], []) + dvc_dirs, dvc_fnames = [], [] + if dvc_fs: + with suppress(FileNotFoundError): + dvc_dirs, dvc_fnames = _ls(dvc_fs, dvc_path) try: - _, repo_dirs, repo_fnames = next( - dvcignore.walk(fs, fs_path, **kwargs) - ) - except StopIteration: + repo_dirs, repo_fnames = _ls(fs, fs_path) + except FileNotFoundError: return + repo_dirs, repo_fnames = dvcignore( + fs_path, repo_dirs, repo_fnames, **kwargs + ) + # separate subdirs into shared dirs, dvc-only dirs, repo-only dirs dvc_set = set(dvc_dirs) repo_set = set(repo_dirs) From b7db869cc78d6c3c512a9296dabf30576e494ba8 Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Sat, 9 Apr 2022 16:31:27 +0300 Subject: [PATCH 4/8] repofs: base walk on ls --- dvc/fs/fsspec_wrapper.py | 2 +- dvc/fs/repo.py | 223 ++++++++++++++----------------------- dvc/ignore.py | 19 +++- tests/unit/fs/test_repo.py | 41 +++---- 4 files changed, 123 insertions(+), 162 deletions(-) diff --git a/dvc/fs/fsspec_wrapper.py b/dvc/fs/fsspec_wrapper.py index 85efea57d6..89a3e5ac6c 100644 --- a/dvc/fs/fsspec_wrapper.py +++ b/dvc/fs/fsspec_wrapper.py @@ -91,7 +91,7 @@ def ls(self, path: AnyFSPath, detail: "Literal[False]") -> Iterator[str]: ... def ls(self, path, detail=False): - yield from self.fs.ls(path, detail=detail) + return self.fs.ls(path, detail=detail) def find(self, path, prefix=None): yield from self.fs.find(path) diff --git a/dvc/fs/repo.py b/dvc/fs/repo.py index a15db1c98f..a9ca0abd3e 100644 --- a/dvc/fs/repo.py +++ b/dvc/fs/repo.py @@ -5,7 +5,7 @@ from itertools import takewhile from typing import TYPE_CHECKING, Callable, Optional, Tuple, Type, Union -from funcy import lfilter, wrap_with +from funcy import wrap_with from ._callback import DEFAULT_CALLBACK from .base import FileSystem @@ -38,6 +38,29 @@ def _ls(fs, path): return dnames, fnames +def _merge_info(repo, fs_info, dvc_info): + from dvc.utils import is_exec + + if not fs_info: + dvc_info["repo"] = repo + dvc_info["isdvc"] = True + return dvc_info + + fs_info["repo"] = repo + fs_info["isout"] = dvc_info.get("isout", False) if dvc_info else False + fs_info["outs"] = dvc_info["outs"] if dvc_info else None + fs_info["isdvc"] = dvc_info["isdvc"] if dvc_info else False + fs_info["meta"] = dvc_info.get("meta") if dvc_info else None + + isexec = False + if dvc_info: + isexec = dvc_info["isexec"] + elif fs_info["type"] == "file": + isexec = is_exec(fs_info["mode"]) + fs_info["isexec"] = isexec + return fs_info + + class RepoFileSystem(FileSystem): # pylint:disable=abstract-method """DVC + git-tracked files fs. @@ -249,13 +272,13 @@ def open( return dvc_fs.open(dvc_path, mode=mode, encoding=encoding, **kwargs) - def exists(self, path) -> bool: + def exists(self, path, **kwargs) -> bool: fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) if not dvc_fs: return fs.exists(fs_path) - if dvc_fs.repo.dvcignore.is_ignored(fs, path): + if dvc_fs.repo.dvcignore.is_ignored(fs, path, **kwargs): return False if fs.exists(path): @@ -273,10 +296,10 @@ def exists(self, path) -> bool: return True - def isdir(self, path): # pylint: disable=arguments-renamed + def isdir(self, path, **kwargs): # pylint: disable=arguments-renamed fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_dir(fs_path): + if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_dir(fs_path, **kwargs): return False try: @@ -307,10 +330,10 @@ def isdvc(self, path, **kwargs): _, _, dvc_fs, dvc_path = self._get_fs_pair(path) return dvc_fs is not None and dvc_fs.isdvc(dvc_path, **kwargs) - def isfile(self, path): # pylint: disable=arguments-renamed + def isfile(self, path, **kwargs): # pylint: disable=arguments-renamed fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_file(fs_path): + if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_file(fs_path, **kwargs): return False try: @@ -337,53 +360,39 @@ def isfile(self, path): # pylint: disable=arguments-renamed return info["type"] == "file" - def _subrepo_walk(self, dir_path, **kwargs): - """Walk into a new repo. - - NOTE: subrepo will only be discovered when walking if - ignore_subrepos is set to False. - """ - fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(dir_path) - yield from self._walk(fs, fs_path, dvc_fs, dvc_path, **kwargs) - - def _walk( - self, - fs, - fs_path, - dvc_fs, - dvc_path, - dvcfiles=False, - dvcignore=None, - **kwargs, - ): - from dvc.dvcfile import is_valid_filename - from dvc.ignore import DvcIgnore + def ls(self, path, detail=True, **kwargs): + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - assert dvcignore + repo = self._get_repo(os.path.abspath(path)) + dvcignore = repo.dvcignore + ignore_subrepos = kwargs.get("ignore_subrepos", True) - dvc_dirs, dvc_fnames = [], [] + dvc_entries = [] if dvc_fs: with suppress(FileNotFoundError): - dvc_dirs, dvc_fnames = _ls(dvc_fs, dvc_path) + dvc_entries = dvc_fs.ls(dvc_path, detail=True) + fs_entries = [] try: - repo_dirs, repo_fnames = _ls(fs, fs_path) - except FileNotFoundError: - return + fs_entries = dvcignore.ls( + fs, fs_path, detail=True, ignore_subrepos=ignore_subrepos + ) + except (FileNotFoundError, NotADirectoryError): + pass - repo_dirs, repo_fnames = dvcignore( - fs_path, repo_dirs, repo_fnames, **kwargs - ) + def _to_dict(fs, entries): + return {fs.path.name(entry["name"]): entry for entry in entries} - # separate subdirs into shared dirs, dvc-only dirs, repo-only dirs - dvc_set = set(dvc_dirs) - repo_set = set(repo_dirs) - dvc_only = list(dvc_set - repo_set) - repo_only = list(repo_set - dvc_set) - shared = list(dvc_set & repo_set) - dirs = shared + dvc_only + repo_only + dvc_dict = _to_dict(dvc_fs, dvc_entries) + fs_dict = _to_dict(fs, fs_entries) + overlay = {**dvc_dict, **fs_dict} + + dvcfiles = kwargs.get("dvcfiles", False) def _func(fname): + from dvc.dvcfile import is_valid_filename + from dvc.ignore import DvcIgnore + if dvcfiles: return True @@ -391,51 +400,15 @@ def _func(fname): is_valid_filename(fname) or fname == DvcIgnore.DVCIGNORE_FILE ) - # merge file lists - files = set(filter(_func, dvc_fnames + repo_fnames)) - - yield fs_path, dirs, list(files) + names = filter(_func, overlay.keys()) - def is_dvc_repo(d): - return self._is_dvc_repo(os.path.join(fs_path, d)) + if not detail: + return list(names) - # remove subrepos to prevent it from being traversed - subrepos = set(filter(is_dvc_repo, repo_only)) - # set dir order for next recursion level - shared dirs first so that - # next() for both generators recurses into the same shared directory - dvc_dirs[:] = [dirname for dirname in dirs if dirname in dvc_set] - repo_dirs[:] = lfilter(lambda d: d in (repo_set - subrepos), dirs) - - for dirname in dirs: - if dirname in subrepos: - dir_path = os.path.join(fs_path, dirname) - yield from self._subrepo_walk( - dir_path, dvcfiles=dvcfiles, dvcignore=dvcignore, **kwargs - ) - elif dirname in shared: - yield from self._walk( - fs, - fs.path.join(fs_path, dirname), - dvc_fs, - dvc_fs.path.join(dvc_path, dirname), - dvcfiles=dvcfiles, - dvcignore=dvcignore, - **kwargs, - ) - elif dirname in dvc_set: - yield from _wrap_walk( - dvc_fs, dvc_fs.path.join(dvc_path, dirname) - ) - elif dirname in repo_set: - yield from self._walk( - fs, - fs.path.join(fs_path, dirname), - None, - None, - dvcfiles=dvcfiles, - dvcignore=dvcignore, - **kwargs, - ) + return [ + _merge_info(dvc_fs.repo, fs_dict.get(name), dvc_dict.get(name)) + for name in names + ] def walk(self, top, topdown=True, **kwargs): """Walk and merge both DVC and repo fss. @@ -449,45 +422,34 @@ def walk(self, top, topdown=True, **kwargs): Any kwargs will be passed into methods used for fetching and/or streaming DVC outs from remotes. """ - assert topdown + dirs = [] + nondirs = [] + + ignore_subrepos = kwargs.get("ignore_subrepos", True) - if not self.exists(top): + if not self.exists(top, ignore_subrepos=ignore_subrepos): return - if not self.isdir(top): + if not self.isdir(top, ignore_subrepos=ignore_subrepos): return - repo = self._get_repo(os.path.abspath(top)) - dvcignore = repo.dvcignore - dvcfiles = kwargs.pop("dvcfiles", False) - - fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(top) - repo_exists = fs.exists(fs_path) - - if not dvc_fs or (repo_exists and dvc_fs.isdvc(dvc_path)): - yield from self._walk( - fs, - fs_path, - None, - None, - dvcfiles=dvcfiles, - dvcignore=dvcignore, - **kwargs, + for entry in self.ls(top, **kwargs): + name = self.path.name(entry["name"]) + if entry["type"] == "directory": + dirs.append(name) + else: + nondirs.append(name) + + if topdown: + yield top, dirs, nondirs + + for dname in dirs: + yield from self.walk( + self.path.join(top, dname), topdown=topdown, **kwargs ) - return - if not repo_exists: - yield from _wrap_walk(dvc_fs, dvc_path, topdown=topdown, **kwargs) - - yield from self._walk( - fs, - fs_path, - dvc_fs, - dvc_path, - dvcfiles=dvcfiles, - dvcignore=dvcignore, - **kwargs, - ) + if not topdown: + yield top, dirs, nondirs def find(self, path, prefix=None): for root, _, files in self.walk(path): @@ -520,32 +482,13 @@ def info(self, path): dvc_info = None try: - from dvc.utils import is_exec - fs_info = fs.info(fs_path) - fs_info["repo"] = dvc_fs.repo - fs_info["isout"] = ( - dvc_info.get("isout", False) if dvc_info else False - ) - fs_info["outs"] = dvc_info["outs"] if dvc_info else None - fs_info["isdvc"] = dvc_info["isdvc"] if dvc_info else False - fs_info["meta"] = dvc_info.get("meta") if dvc_info else None - - isexec = False - if dvc_info: - isexec = dvc_info["isexec"] - elif fs_info["type"] == "file": - isexec = is_exec(fs_info["mode"]) - fs_info["isexec"] = isexec - return fs_info - except FileNotFoundError: if not dvc_info: raise + fs_info = None - dvc_info["repo"] = dvc_fs.repo - dvc_info["isdvc"] = True - return dvc_info + return _merge_info(dvc_fs.repo, fs_info, dvc_info) def checksum(self, path): fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) diff --git a/dvc/ignore.py b/dvc/ignore.py index dd60a96432..b3f9eb4a99 100644 --- a/dvc/ignore.py +++ b/dvc/ignore.py @@ -2,7 +2,7 @@ import os import re from collections import namedtuple -from itertools import groupby, takewhile +from itertools import groupby, takewhile, chain from pathspec.patterns import GitWildMatchPattern from pathspec.util import normalize_file @@ -267,6 +267,23 @@ def __call__(self, root, dirs, files, ignore_subrepos=True): dirs, files = ignore_pattern(abs_root, dirs, files) return dirs, files + def ls(self, fs, path, detail=True, **kwargs): + fs_dict = {} + dirs = [] + nondirs = [] + + for entry in fs.ls(path, detail=True, **kwargs): + name = fs.path.name(entry["name"]) + fs_dict[name] = entry + if entry["type"] == "directory": + dirs.append(name) + else: + nondirs.append(name) + + dirs, nondirs = self(path, dirs, nondirs, **kwargs) + + return [fs_dict[name] for name in chain(dirs, nondirs)] + def walk(self, fs: FileSystem, path: AnyPath, **kwargs): ignore_subrepos = kwargs.pop("ignore_subrepos", True) if fs.scheme == Schemes.LOCAL: diff --git a/tests/unit/fs/test_repo.py b/tests/unit/fs/test_repo.py index cdf22e0aea..c40be8c93d 100644 --- a/tests/unit/fs/test_repo.py +++ b/tests/unit/fs/test_repo.py @@ -194,13 +194,13 @@ def test_walk(tmp_dir, dvc, dvcfiles, extra_expected): fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join(tmp_dir, "dir", "subdir1"), - os.path.join(tmp_dir, "dir", "subdir2"), - os.path.join(tmp_dir, "dir", "subdir1", "foo1"), - os.path.join(tmp_dir, "dir", "subdir1", "bar1"), - os.path.join(tmp_dir, "dir", "subdir2", "foo2"), - os.path.join(tmp_dir, "dir", "foo"), - os.path.join(tmp_dir, "dir", "bar"), + os.path.join("dir", "subdir1"), + os.path.join("dir", "subdir2"), + os.path.join("dir", "subdir1", "foo1"), + os.path.join("dir", "subdir1", "bar1"), + os.path.join("dir", "subdir2", "foo2"), + os.path.join("dir", "foo"), + os.path.join("dir", "bar"), ] actual = [] @@ -208,7 +208,7 @@ def test_walk(tmp_dir, dvc, dvcfiles, extra_expected): for entry in dirs + files: actual.append(os.path.join(root, entry)) - expected += [os.path.join(tmp_dir, path) for path in extra_expected] + expected += [os.path.join(path) for path in extra_expected] assert set(actual) == set(expected) assert len(actual) == len(expected) @@ -228,14 +228,15 @@ def test_walk_dirty(tmp_dir, dvc): fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join(tmp_dir, "dir", "subdir1"), - os.path.join(tmp_dir, "dir", "subdir2"), - os.path.join(tmp_dir, "dir", "subdir3"), - os.path.join(tmp_dir, "dir", "subdir1", "foo1"), - os.path.join(tmp_dir, "dir", "subdir1", "bar1"), - os.path.join(tmp_dir, "dir", "subdir2", "foo2"), - os.path.join(tmp_dir, "dir", "subdir3", "foo3"), - os.path.join(tmp_dir, "dir", "bar"), + os.path.join("dir", "subdir1"), + os.path.join("dir", "subdir2"), + os.path.join("dir", "subdir3"), + os.path.join("dir", "subdir1", "foo1"), + os.path.join("dir", "subdir1", "bar1"), + os.path.join("dir", "subdir2", "foo2"), + os.path.join("dir", "subdir3", "foo3"), + os.path.join("dir", "bar"), + os.path.join("dir", "foo"), ] actual = [] @@ -260,7 +261,7 @@ def test_walk_dirty_cached_dir(tmp_dir, scm, dvc): for entry in dirs + files: actual.append(os.path.join(root, entry)) - assert actual == [(data / "bar").fs_path] + assert actual == [(data / "bar").fs_path, (data / "foo").fs_path] def test_walk_mixed_dir(tmp_dir, scm, dvc): @@ -278,9 +279,9 @@ def test_walk_mixed_dir(tmp_dir, scm, dvc): fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join(tmp_dir, "dir", "foo"), - os.path.join(tmp_dir, "dir", "bar"), - os.path.join(tmp_dir, "dir", ".gitignore"), + os.path.join("dir", "foo"), + os.path.join("dir", "bar"), + os.path.join("dir", ".gitignore"), ] actual = [] for root, dirs, files in fs.walk("dir"): From b056642d0e23a8d5d5bd8e70799479e019cee93d Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Mon, 11 Apr 2022 15:39:56 +0300 Subject: [PATCH 5/8] repofs: unify ls/walk/info/exists/etc behaviour --- dvc/fs/repo.py | 143 +++++++++++++------------------------ dvc/ignore.py | 15 ++-- dvc/repo/ls.py | 6 +- tests/func/test_ls.py | 1 - tests/unit/fs/test_repo.py | 4 +- 5 files changed, 64 insertions(+), 105 deletions(-) diff --git a/dvc/fs/repo.py b/dvc/fs/repo.py index a9ca0abd3e..3551b549f2 100644 --- a/dvc/fs/repo.py +++ b/dvc/fs/repo.py @@ -273,120 +273,49 @@ def open( return dvc_fs.open(dvc_path, mode=mode, encoding=encoding, **kwargs) def exists(self, path, **kwargs) -> bool: - fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - - if not dvc_fs: - return fs.exists(fs_path) - - if dvc_fs.repo.dvcignore.is_ignored(fs, path, **kwargs): - return False - - if fs.exists(path): + try: + self.info(path, **kwargs) return True - - if not dvc_fs.exists(dvc_path): + except FileNotFoundError: return False - for p in self.path.parents(fs_path): - try: - if fs.info(p)["type"] != "directory": - return False - except FileNotFoundError: - continue - - return True - def isdir(self, path, **kwargs): # pylint: disable=arguments-renamed - fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - - if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_dir(fs_path, **kwargs): - return False - try: - info = fs.info(fs_path) - return info["type"] == "directory" - except (OSError, ValueError): - # from CPython's os.path.isdir() - pass - - if not dvc_fs: - return False - - try: - info = dvc_fs.info(dvc_path) + return self.info(path, **kwargs)["type"] == "directory" except FileNotFoundError: return False - for p in fs.path.parents(fs_path): - try: - if fs.info(p)["type"] != "directory": - return False - except FileNotFoundError: - continue - - return info["type"] == "directory" - def isdvc(self, path, **kwargs): _, _, dvc_fs, dvc_path = self._get_fs_pair(path) return dvc_fs is not None and dvc_fs.isdvc(dvc_path, **kwargs) def isfile(self, path, **kwargs): # pylint: disable=arguments-renamed - fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - - if dvc_fs and dvc_fs.repo.dvcignore.is_ignored_file(fs_path, **kwargs): - return False - - try: - info = fs.info(fs_path) - return info["type"] == "file" - except (OSError, ValueError): - # from CPython's os.path.isfile() - pass - - if not dvc_fs: - return False - try: - info = dvc_fs.info(dvc_path) + return self.info(path, **kwargs)["type"] == "file" except FileNotFoundError: return False - for p in fs.path.parents(fs_path): - try: - if fs.info(p)["type"] != "directory": - return False - except FileNotFoundError: - continue - - return info["type"] == "file" - def ls(self, path, detail=True, **kwargs): fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - repo = self._get_repo(os.path.abspath(path)) + repo = dvc_fs.repo if dvc_fs else self._main_repo dvcignore = repo.dvcignore ignore_subrepos = kwargs.get("ignore_subrepos", True) - dvc_entries = [] + names = set() if dvc_fs: with suppress(FileNotFoundError): - dvc_entries = dvc_fs.ls(dvc_path, detail=True) + for entry in dvc_fs.ls(dvc_path, detail=False): + names.add(dvc_fs.path.name(entry)) - fs_entries = [] try: - fs_entries = dvcignore.ls( - fs, fs_path, detail=True, ignore_subrepos=ignore_subrepos - ) + for entry in dvcignore.ls( + fs, fs_path, detail=False, ignore_subrepos=ignore_subrepos + ): + names.add(fs.path.name(entry)) except (FileNotFoundError, NotADirectoryError): pass - def _to_dict(fs, entries): - return {fs.path.name(entry["name"]): entry for entry in entries} - - dvc_dict = _to_dict(dvc_fs, dvc_entries) - fs_dict = _to_dict(fs, fs_entries) - overlay = {**dvc_dict, **fs_dict} - dvcfiles = kwargs.get("dvcfiles", False) def _func(fname): @@ -400,15 +329,23 @@ def _func(fname): is_valid_filename(fname) or fname == DvcIgnore.DVCIGNORE_FILE ) - names = filter(_func, overlay.keys()) + names = filter(_func, names) + + infos = [] + paths = [] + for name in names: + entry_path = self.path.join(path, name) + try: + info = self.info(entry_path, ignore_subrepos=ignore_subrepos) + except FileNotFoundError: + continue + infos.append(info) + paths.append(entry_path) if not detail: - return list(names) + return paths - return [ - _merge_info(dvc_fs.repo, fs_dict.get(name), dvc_dict.get(name)) - for name in names - ] + return infos def walk(self, top, topdown=True, **kwargs): """Walk and merge both DVC and repo fss. @@ -473,9 +410,13 @@ def get_file( dvc_path, to_file, callback=callback, **kwargs ) - def info(self, path): + def info(self, path, **kwargs): fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) + repo = dvc_fs.repo if dvc_fs else self._main_repo + dvcignore = repo.dvcignore + ignore_subrepos = kwargs.get("ignore_subrepos", True) + try: dvc_info = dvc_fs.info(dvc_path) except FileNotFoundError: @@ -483,11 +424,29 @@ def info(self, path): try: fs_info = fs.info(fs_path) - except FileNotFoundError: + if dvcignore.is_ignored( + fs, fs_path, ignore_subrepos=ignore_subrepos + ): + fs_info = None + except (FileNotFoundError, NotADirectoryError): if not dvc_info: raise fs_info = None + # NOTE: if some parent in fs_path turns out to be a file, it means + # that that whole repofs branch doesn't exist. + if not fs_info and dvc_info: + for parent in self.path.parents(fs_path): + try: + if fs.info(parent)["type"] != "directory": + dvc_info = None + break + except FileNotFoundError: + continue + + if not dvc_info and not fs_info: + raise FileNotFoundError + return _merge_info(dvc_fs.repo, fs_info, dvc_info) def checksum(self, path): diff --git a/dvc/ignore.py b/dvc/ignore.py index b3f9eb4a99..eaff9768d1 100644 --- a/dvc/ignore.py +++ b/dvc/ignore.py @@ -2,7 +2,7 @@ import os import re from collections import namedtuple -from itertools import groupby, takewhile, chain +from itertools import chain, groupby, takewhile from pathspec.patterns import GitWildMatchPattern from pathspec.util import normalize_file @@ -282,6 +282,9 @@ def ls(self, fs, path, detail=True, **kwargs): dirs, nondirs = self(path, dirs, nondirs, **kwargs) + if not detail: + return dirs + nondirs + return [fs_dict[name] for name in chain(dirs, nondirs)] def walk(self, fs: FileSystem, path: AnyPath, **kwargs): @@ -354,10 +357,10 @@ def is_ignored_dir(self, path: str, ignore_subrepos: bool = True) -> bool: return self._is_ignored(path, True, ignore_subrepos=ignore_subrepos) - def is_ignored_file(self, path: str) -> bool: + def is_ignored_file(self, path: str, ignore_subrepos: bool = True) -> bool: "Only used in LocalFileSystem" path = os.path.abspath(path) - return self._is_ignored(path, False) + return self._is_ignored(path, False, ignore_subrepos=ignore_subrepos) def _outside_repo(self, path): # paths outside of the repo should be ignored @@ -395,12 +398,12 @@ def is_ignored( if fs.scheme != Schemes.LOCAL: return False if fs.isfile(path): - return self.is_ignored_file(path) + return self.is_ignored_file(path, ignore_subrepos) if fs.isdir(path): return self.is_ignored_dir(path, ignore_subrepos) - return self.is_ignored_file(path) or self.is_ignored_dir( + return self.is_ignored_file( path, ignore_subrepos - ) + ) or self.is_ignored_dir(path, ignore_subrepos) def init(path): diff --git a/dvc/repo/ls.py b/dvc/repo/ls.py index 1f76720206..083b7aedc3 100644 --- a/dvc/repo/ls.py +++ b/dvc/repo/ls.py @@ -60,11 +60,7 @@ def _ls(repo, fs_path, recursive=None, dvc_only=False): ret = {} for info in infos: - try: - _info = fs.info(info) - except FileNotFoundError: - # broken symlink - _info = {"type": "file", "isexec": False} + _info = fs.info(info) if _info.get("outs") or not dvc_only: path = ( diff --git a/tests/func/test_ls.py b/tests/func/test_ls.py index 4701375529..a08c92dd1b 100644 --- a/tests/func/test_ls.py +++ b/tests/func/test_ls.py @@ -578,5 +578,4 @@ def test_broken_symlink(tmp_dir, dvc): "isexec": False, "path": ".dvcignore", }, - {"isout": False, "isdir": False, "isexec": False, "path": "link"}, ] diff --git a/tests/unit/fs/test_repo.py b/tests/unit/fs/test_repo.py index c40be8c93d..b0c6d178e2 100644 --- a/tests/unit/fs/test_repo.py +++ b/tests/unit/fs/test_repo.py @@ -261,7 +261,9 @@ def test_walk_dirty_cached_dir(tmp_dir, scm, dvc): for entry in dirs + files: actual.append(os.path.join(root, entry)) - assert actual == [(data / "bar").fs_path, (data / "foo").fs_path] + expected = [(data / "foo").fs_path, (data / "bar").fs_path] + assert set(actual) == set(expected) + assert len(actual) == len(expected) def test_walk_mixed_dir(tmp_dir, scm, dvc): From 82e4a05d4c85354f9376110031015aff6076889d Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Mon, 11 Apr 2022 22:26:21 +0300 Subject: [PATCH 6/8] repofs: use repo paths --- dvc/api.py | 5 +- dvc/data/reference.py | 9 +- dvc/dependency/repo.py | 15 +- dvc/fs/path.py | 3 +- dvc/fs/repo.py | 94 ++++++----- dvc/output.py | 5 + dvc/repo/__init__.py | 1 - dvc/repo/collect.py | 24 +-- dvc/repo/get.py | 5 +- dvc/repo/ls.py | 4 +- dvc/repo/metrics/show.py | 5 +- dvc/repo/plots/__init__.py | 4 +- dvc/repo/reproduce.py | 2 +- tests/func/experiments/test_checkpoints.py | 20 +-- tests/func/experiments/test_experiments.py | 4 +- tests/func/test_external_repo.py | 8 +- tests/func/test_fs.py | 6 +- tests/unit/fs/test_repo.py | 178 ++++++++++----------- tests/unit/fs/test_repo_info.py | 16 +- tests/unit/test_external_repo.py | 4 +- 20 files changed, 213 insertions(+), 199 deletions(-) diff --git a/dvc/api.py b/dvc/api.py index e31d4506a7..28b5c04384 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -19,15 +19,14 @@ def get_url(path, repo=None, rev=None, remote=None): directory in the remote storage. """ with Repo.open(repo, rev=rev, subrepos=True, uninitialized=True) as _repo: - fs_path = _repo.fs.path.join(_repo.root_dir, path) with reraise(FileNotFoundError, PathMissingError(path, repo)): - info = _repo.repo_fs.info(fs_path) + info = _repo.repo_fs.info(path) if not info["isdvc"]: raise OutputNotFoundError(path, repo) cloud = info["repo"].cloud - dvc_path = _repo.fs.path.relpath(fs_path, info["repo"].root_dir) + _, _, dvc_fs, dvc_path = _repo.repo_fs._get_fs_pair(path) if not os.path.isabs(path): dvc_path = dvc_path.replace("\\", "/") diff --git a/dvc/data/reference.py b/dvc/data/reference.py index a3438e390a..0cd28274e3 100644 --- a/dvc/data/reference.py +++ b/dvc/data/reference.py @@ -30,6 +30,7 @@ def __init__( checksum: Optional[str] = None, **kwargs, ): + from dvc.fs.repo import RepoFileSystem super().__init__(fs_path, fs, hash_info, **kwargs) self.checksum = checksum or fs.checksum(fs_path) @@ -60,9 +61,6 @@ def to_bytes(self): # ReferenceHashFiles should currently only be serialized in # memory and not to disk fs_path = self.fs_path - if isinstance(self.fs, RepoFileSystem): - fs_path = self.fs.path.relpath(fs_path, self.fs.root_dir) - dict_ = { self.PARAM_PATH: fs_path, self.PARAM_HASH: self.hash_info, @@ -95,11 +93,10 @@ def from_bytes(cls, data: bytes, fs_cache: Optional[dict] = None): if not fs: config = dict(config_pairs) if RepoFileSystem.PARAM_REPO_URL in config: - fs = RepoFileSystem(**config) - fs_path = fs.path.join(fs.root_dir, fs_path) + fs_cls = RepoFileSystem else: fs_cls = get_fs_cls(config, scheme=scheme) - fs = fs_cls(**config) + fs = fs_cls(**config) return ReferenceHashFile( fs_path, fs, diff --git a/dvc/dependency/repo.py b/dvc/dependency/repo.py index 6737b4f317..e4dfec3574 100644 --- a/dvc/dependency/repo.py +++ b/dvc/dependency/repo.py @@ -112,11 +112,14 @@ def _get_used_and_obj( if locked and self.def_repo.get(self.PARAM_REV_LOCK) is None: self.def_repo[self.PARAM_REV_LOCK] = rev - path = os.path.abspath(os.path.join(repo.root_dir, self.def_path)) if not obj_only: try: for odb, obj_ids in repo.used_objs( - [path], + [ + os.path.abspath( + os.path.join(repo.root_dir, self.def_path) + ) + ], force=True, jobs=kwargs.get("jobs"), recursive=True, @@ -132,7 +135,7 @@ def _get_used_and_obj( try: staging, _, staged_obj = stage( local_odb, - path, + self.def_path, repo.repo_fs, local_odb.fs.PARAM_CHECKSUM, ) @@ -172,17 +175,17 @@ def iter_objs(): continue if ( obj.fs.repo_url in checked_urls - or obj.fs.root_dir in checked_urls + or obj.fs._root_dir in checked_urls ): continue self_url = self.repo.url or self.repo.root_dir if ( obj.fs.repo_url is not None and obj.fs.repo_url == self_url - or obj.fs.root_dir == self.repo.root_dir + or obj.fs._root_dir == self.repo.root_dir ): raise CircularImportError(self, obj.fs.repo_url, self_url) - checked_urls.update([obj.fs.repo_url, obj.fs.root_dir]) + checked_urls.update([obj.fs.repo_url, obj.fs._root_dir]) def get_obj(self, filter_info=None, **kwargs): locked = kwargs.get("locked", True) diff --git a/dvc/fs/path.py b/dvc/fs/path.py index 7545fb7a88..c06dc2a59f 100644 --- a/dvc/fs/path.py +++ b/dvc/fs/path.py @@ -81,7 +81,8 @@ def overlaps(self, left, right): return self.isin_or_eq(left, right) or self.isin(right, left) def relpath(self, path, start): - assert start + if not start: + return path return self.flavour.relpath(path, start=start) def relparts(self, path, base): diff --git a/dvc/fs/repo.py b/dvc/fs/repo.py index 3551b549f2..4230b4ddf9 100644 --- a/dvc/fs/repo.py +++ b/dvc/fs/repo.py @@ -107,19 +107,19 @@ def __init__( self._main_repo = repo self.hash_jobs = repo.fs.hash_jobs - self.root_dir: str = repo.root_dir + self._root_dir: str = repo.root_dir self._traverse_subrepos = subrepos self._subrepos_trie = PathStringTrie() """Keeps track of each and every path with the corresponding repo.""" - self._subrepos_trie[self.root_dir] = repo + self._subrepos_trie[self._root_dir] = repo self._dvcfss = {} """Keep a dvcfs instance of each repo.""" if hasattr(repo, "dvc_dir"): - self._dvcfss[repo.root_dir] = DvcFileSystem(repo=repo) + self._dvcfss[self._root_dir] = DvcFileSystem(repo=repo) @property def repo_url(self): @@ -131,7 +131,7 @@ def repo_url(self): def config(self): return { self.PARAM_REPO_URL: self.repo_url, - self.PARAM_REPO_ROOT: self.root_dir, + self.PARAM_REPO_ROOT: self._root_dir, self.PARAM_REV: getattr(self._main_repo.fs, "rev", None), self.PARAM_CACHE_DIR: os.path.abspath( self._main_repo.odb.local.cache_dir @@ -202,13 +202,13 @@ def _get_repo(self, path: str) -> Optional["Repo"]: prefix, repo = self._subrepos_trie.longest_prefix(path) if not prefix: - return None + return self._main_repo parents = (parent for parent in self.path.parents(path)) dirs = [path] + list(takewhile(lambda p: p != prefix, parents)) dirs.reverse() self._update(dirs, starting_repo=repo) - return self._subrepos_trie.get(path) + return self._subrepos_trie.get(path) or self._main_repo @wrap_with(threading.Lock()) def _update(self, dirs, starting_repo): @@ -240,22 +240,31 @@ def _get_fs_pair( """ Returns a pair of fss based on repo the path falls in, using prefix. """ - path = os.path.abspath(path) + if os.path.isabs(path): + return None, None, self._main_repo.dvcfs, path - # fallback to the top-level repo if repo was not found - # this can happen if the path is outside of the repo - repo = self._get_repo(path) or self._main_repo + parts = self.path.parts(path) + if parts and parts[0] == os.curdir: + parts = parts[1:] - dvc_fs = self._dvcfss.get(repo.root_dir) + fs_path = self._main_repo.fs.path.join( + self._main_repo.root_dir, *parts + ) + repo = self._get_repo(fs_path) + fs = repo.fs - if path.startswith(repo.root_dir): - dvc_path = path[len(repo.root_dir) + 1 :] + repo_parts = fs.path.relparts(repo.root_dir, self._main_repo.root_dir) + if repo_parts[0] == os.curdir: + repo_parts = repo_parts[1:] - dvc_path = dvc_path.replace("\\", "/") - else: - dvc_path = path + dvc_parts = parts[len(repo_parts) :] + if dvc_parts and dvc_parts[0] == os.curdir: + dvc_parts = dvc_parts[1:] - return repo.fs, path, dvc_fs, dvc_path + dvc_fs = self._dvcfss.get(repo.root_dir) + dvc_path = dvc_fs.path.join(*dvc_parts) if dvc_parts else "" + + return fs, fs_path, dvc_fs, dvc_path def open( self, path, mode="r", encoding="utf-8", **kwargs @@ -397,14 +406,16 @@ def get_file( self, from_info, to_file, callback=DEFAULT_CALLBACK, **kwargs ): fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(from_info) - try: - fs.get_file( # pylint: disable=protected-access - fs_path, to_file, callback=callback, **kwargs - ) - return - except FileNotFoundError: - if not dvc_fs: - raise + + if fs: + try: + fs.get_file( # pylint: disable=protected-access + fs_path, to_file, callback=callback, **kwargs + ) + return + except FileNotFoundError: + if not dvc_fs: + raise dvc_fs.get_file( # pylint: disable=protected-access dvc_path, to_file, callback=callback, **kwargs @@ -417,25 +428,28 @@ def info(self, path, **kwargs): dvcignore = repo.dvcignore ignore_subrepos = kwargs.get("ignore_subrepos", True) - try: - dvc_info = dvc_fs.info(dvc_path) - except FileNotFoundError: - dvc_info = None + dvc_info = None + if dvc_fs: + try: + dvc_info = dvc_fs.info(dvc_path) + except FileNotFoundError: + pass - try: - fs_info = fs.info(fs_path) - if dvcignore.is_ignored( - fs, fs_path, ignore_subrepos=ignore_subrepos - ): - fs_info = None - except (FileNotFoundError, NotADirectoryError): - if not dvc_info: - raise - fs_info = None + fs_info = None + if fs: + try: + fs_info = fs.info(fs_path) + if dvcignore.is_ignored( + fs, fs_path, ignore_subrepos=ignore_subrepos + ): + fs_info = None + except (FileNotFoundError, NotADirectoryError): + if not dvc_info: + raise # NOTE: if some parent in fs_path turns out to be a file, it means # that that whole repofs branch doesn't exist. - if not fs_info and dvc_info: + if fs and not fs_info and dvc_info: for parent in self.path.parents(fs_path): try: if fs.info(parent)["type"] != "directory": diff --git a/dvc/output.py b/dvc/output.py index a532f98603..d4bc274326 100644 --- a/dvc/output.py +++ b/dvc/output.py @@ -386,6 +386,11 @@ def is_in_repo(self): os.path.realpath(self.fs_path), self.repo.root_dir ) + @property + def repo_path(self): + assert self.is_in_repo + return relpath(self.fs_path, self.repo.root_dir) + @property def use_scm_ignore(self): if not self.is_in_repo: diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index bba3ce1b9c..7147b1abc3 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -494,7 +494,6 @@ def open_by_relpath(self, path, remote=None, mode="r", encoding=None): from dvc.fs.repo import RepoFileSystem fs = RepoFileSystem(self, subrepos=True) - path = self.fs.path.join(self.root_dir, path) try: with fs.open( path, mode=mode, encoding=encoding, remote=remote diff --git a/dvc/repo/collect.py b/dvc/repo/collect.py index 2ba1895fac..c942bf9e92 100644 --- a/dvc/repo/collect.py +++ b/dvc/repo/collect.py @@ -34,22 +34,22 @@ def _collect_paths( from dvc.fs.repo import RepoFileSystem from dvc.utils import relpath - fs_paths = [os.path.abspath(target) for target in targets] fs = RepoFileSystem(repo) target_paths = [] - for fs_path in fs_paths: + for target in targets: + if os.path.isabs(target): + target = relpath(target, repo.root_dir) - if recursive and fs.isdir(fs_path): - target_paths.extend(repo.dvcignore.find(fs, fs_path)) + if recursive and fs.isdir(target): + target_paths.extend(repo.dvcignore.find(fs, target)) - if not fs.exists(fs_path): - rel = relpath(fs_path) + if not fs.exists(target): if rev == "workspace" or rev == "": - logger.warning("'%s' was not found in current workspace.", rel) + logger.warning("'%s' was not found in current workspace.", target) else: - logger.warning("'%s' was not found at: '%s'.", rel, rev) - target_paths.append(fs_path) + logger.warning("'%s' was not found at: '%s'.", target, rev) + target_paths.append(target) return target_paths @@ -58,12 +58,14 @@ def _filter_duplicates( ) -> Tuple[Outputs, StrPaths]: res_outs: Outputs = [] fs_res_paths = fs_paths + from dvc.utils import relpath for out in outs: - if out.fs_path in fs_paths: + rel = relpath(out.fs_path, out.stage.repo.root_dir) + if rel in fs_paths: res_outs.append(out) # MUTATING THE SAME LIST!! - fs_res_paths.remove(out.fs_path) + fs_res_paths.remove(rel) return res_outs, fs_res_paths diff --git a/dvc/repo/get.py b/dvc/repo/get.py index 0b15eb9a45..f1690a40ff 100644 --- a/dvc/repo/get.py +++ b/dvc/repo/get.py @@ -49,9 +49,6 @@ def get(url, path, out=None, rev=None, jobs=None): with external_repo( url=url, rev=rev, cache_dir=tmp_dir, cache_types=cache_types ) as repo: - from_fs_path = os.path.abspath(os.path.join(repo.root_dir, path)) - repo.repo_fs.download( - from_fs_path, os.path.abspath(out), jobs=jobs - ) + repo.repo_fs.download(path, os.path.abspath(out), jobs=jobs) finally: remove(tmp_dir) diff --git a/dvc/repo/ls.py b/dvc/repo/ls.py index 083b7aedc3..eba585578b 100644 --- a/dvc/repo/ls.py +++ b/dvc/repo/ls.py @@ -29,9 +29,7 @@ def ls(url, path=None, rev=None, recursive=None, dvc_only=False): from . import Repo with Repo.open(url, rev=rev, subrepos=True, uninitialized=True) as repo: - fs_path = repo.root_dir - if path: - fs_path = os.path.abspath(repo.fs.path.join(fs_path, path)) + fs_path = path or "" ret = _ls(repo, fs_path, recursive, dvc_only) diff --git a/dvc/repo/metrics/show.py b/dvc/repo/metrics/show.py index 6bb78194e1..784faacf9b 100644 --- a/dvc/repo/metrics/show.py +++ b/dvc/repo/metrics/show.py @@ -80,11 +80,12 @@ def _read_metrics(repo, metrics, rev, onerror=None): res = {} for metric in metrics: - if not fs.isfile(metric): + fs_path = repo.fs.path.relpath(metric, repo.root_dir) + if not fs.isfile(fs_path): continue res[fs.path.relpath(metric, os.getcwd())] = _read_metric( - metric, fs, rev, onerror=onerror + fs_path, fs, rev, onerror=onerror ) return res diff --git a/dvc/repo/plots/__init__.py b/dvc/repo/plots/__init__.py index 303308d639..ef684d6ce2 100644 --- a/dvc/repo/plots/__init__.py +++ b/dvc/repo/plots/__init__.py @@ -251,8 +251,8 @@ def _collect_plots( recursive=recursive, ) - result = {plot.fs_path: _plot_props(plot) for plot in plots} - result.update({fs_path: {} for fs_path in fs_paths}) + result = {relpath(plot.fs_path, repo.root_dir): _plot_props(plot) for plot in plots} + result.update({relpath(fs_path, repo.root_dir): {} for fs_path in fs_paths}) return result diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index d4fa759ba4..c57b3000fc 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -63,7 +63,7 @@ def _get_stage_files(stage: "Stage") -> typing.Iterator[str]: if ( not dep.use_scm_ignore and dep.is_in_repo - and not stage.repo.repo_fs.isdvc(dep.fs_path) + and not stage.repo.repo_fs.isdvc(dep.fs.path.relpath(dep.fs_path, stage.repo.root_dir)) ): yield dep.fs_path for out in stage.outs: diff --git a/tests/func/experiments/test_checkpoints.py b/tests/func/experiments/test_checkpoints.py index 3535a1e724..eab79e18cb 100644 --- a/tests/func/experiments/test_checkpoints.py +++ b/tests/func/experiments/test_checkpoints.py @@ -32,9 +32,9 @@ def test_new_checkpoint( if rev == "workspace": continue fs = dvc.repo_fs - with fs.open((tmp_dir / "foo").fs_path) as fobj: + with fs.open("foo") as fobj: assert fobj.read().strip() == str(checkpoint_stage.iterations) - with fs.open((tmp_dir / "metrics.yaml").fs_path) as fobj: + with fs.open("metrics.yaml") as fobj: assert fobj.read().strip() == "foo: 2" if workspace: @@ -81,9 +81,9 @@ def test_resume_checkpoint( if rev == "workspace": continue fs = dvc.repo_fs - with fs.open((tmp_dir / "foo").fs_path) as fobj: + with fs.open("foo") as fobj: assert fobj.read().strip() == str(2 * checkpoint_stage.iterations) - with fs.open((tmp_dir / "metrics.yaml").fs_path) as fobj: + with fs.open("metrics.yaml") as fobj: assert fobj.read().strip() == "foo: 2" if workspace: @@ -112,9 +112,9 @@ def test_reset_checkpoint( if rev == "workspace": continue fs = dvc.repo_fs - with fs.open((tmp_dir / "foo").fs_path) as fobj: + with fs.open("foo") as fobj: assert fobj.read().strip() == str(checkpoint_stage.iterations) - with fs.open((tmp_dir / "metrics.yaml").fs_path) as fobj: + with fs.open("metrics.yaml") as fobj: assert fobj.read().strip() == "foo: 2" if workspace: @@ -151,18 +151,18 @@ def test_resume_branch(tmp_dir, scm, dvc, checkpoint_stage, workspace): if rev == "workspace": continue fs = dvc.repo_fs - with fs.open((tmp_dir / "foo").fs_path) as fobj: + with fs.open("foo") as fobj: assert fobj.read().strip() == str(2 * checkpoint_stage.iterations) - with fs.open((tmp_dir / "metrics.yaml").fs_path) as fobj: + with fs.open("metrics.yaml") as fobj: assert fobj.read().strip() == "foo: 2" for rev in dvc.brancher([checkpoint_b]): if rev == "workspace": continue fs = dvc.repo_fs - with fs.open((tmp_dir / "foo").fs_path) as fobj: + with fs.open("foo") as fobj: assert fobj.read().strip() == str(2 * checkpoint_stage.iterations) - with fs.open((tmp_dir / "metrics.yaml").fs_path) as fobj: + with fs.open("metrics.yaml") as fobj: assert fobj.read().strip() == "foo: 100" with pytest.raises(MultipleBranchError): diff --git a/tests/func/experiments/test_experiments.py b/tests/func/experiments/test_experiments.py index c16e68806f..082c5ca08e 100644 --- a/tests/func/experiments/test_experiments.py +++ b/tests/func/experiments/test_experiments.py @@ -675,9 +675,9 @@ def test_modified_data_dep(tmp_dir, scm, dvc, workspace, params, target): for rev in dvc.brancher(revs=[exp]): if rev != exp: continue - with dvc.repo_fs.open((tmp_dir / "metrics.yaml").fs_path) as fobj: + with dvc.repo_fs.open("metrics.yaml") as fobj: assert fobj.read().strip() == params - with dvc.repo_fs.open((tmp_dir / "data").fs_path) as fobj: + with dvc.repo_fs.open("data") as fobj: assert fobj.read().strip() == "modified" if workspace: diff --git a/tests/func/test_external_repo.py b/tests/func/test_external_repo.py index 147faf1f49..ae51096e1c 100644 --- a/tests/func/test_external_repo.py +++ b/tests/func/test_external_repo.py @@ -96,7 +96,7 @@ def test_pull_subdir_file(tmp_dir, erepo_dir): dest = tmp_dir / "file" with external_repo(os.fspath(erepo_dir)) as repo: repo.repo_fs.download( - os.path.join(repo.root_dir, "subdir", "file"), + os.path.join("subdir", "file"), os.fspath(dest), ) @@ -192,7 +192,7 @@ def test_subrepos_are_ignored(tmp_dir, erepo_dir): with external_repo(os.fspath(erepo_dir)) as repo: repo.repo_fs.download( - os.path.join(repo.root_dir, "dir"), + "dir", os.fspath(tmp_dir / "out"), ) expected_files = {"foo": "foo", "bar": "bar", ".gitignore": "/foo\n"} @@ -206,7 +206,7 @@ def test_subrepos_are_ignored(tmp_dir, erepo_dir): staging, _, obj = stage( repo.odb.local, - os.path.join(repo.root_dir, "dir"), + "dir", repo.repo_fs, "md5", dvcignore=repo.dvcignore, @@ -238,7 +238,7 @@ def test_subrepos_are_ignored_for_git_tracked_dirs(tmp_dir, erepo_dir): with external_repo(os.fspath(erepo_dir)) as repo: repo.repo_fs.download( - os.path.join(repo.root_dir, "dir"), + "dir", os.fspath(tmp_dir / "out"), ) # subrepo files should not be here diff --git a/tests/func/test_fs.py b/tests/func/test_fs.py index 85d439d2be..663e563d35 100644 --- a/tests/func/test_fs.py +++ b/tests/func/test_fs.py @@ -335,7 +335,7 @@ def test_callback_on_repo_fs(tmp_dir, dvc, scm): callback = fsspec.Callback() fs.download( - (tmp_dir / "dir").fs_path, + "dir", (tmp_dir / "dir2").fs_path, callback=callback, ) @@ -346,7 +346,7 @@ def test_callback_on_repo_fs(tmp_dir, dvc, scm): callback = fsspec.Callback() fs.download( - (tmp_dir / "dir" / "foo").fs_path, + os.path.join("dir", "foo"), (tmp_dir / "foo").fs_path, callback=callback, ) @@ -358,7 +358,7 @@ def test_callback_on_repo_fs(tmp_dir, dvc, scm): callback = fsspec.Callback() fs.download( - (tmp_dir / "dir" / "bar").fs_path, + os.path.join("dir", "bar"), (tmp_dir / "bar").fs_path, callback=callback, ) diff --git a/tests/unit/fs/test_repo.py b/tests/unit/fs/test_repo.py index b0c6d178e2..ea3f39600e 100644 --- a/tests/unit/fs/test_repo.py +++ b/tests/unit/fs/test_repo.py @@ -25,7 +25,7 @@ def test_open(tmp_dir, dvc): (tmp_dir / "foo").unlink() fs = RepoFileSystem(repo=dvc) - with fs.open((tmp_dir / "foo").fs_path, "r") as fobj: + with fs.open("foo", "r") as fobj: assert fobj.read() == "foo" @@ -34,7 +34,7 @@ def test_open_dirty_hash(tmp_dir, dvc): (tmp_dir / "file").write_text("something") fs = RepoFileSystem(repo=dvc) - with fs.open((tmp_dir / "file").fs_path, "r") as fobj: + with fs.open("file", "r") as fobj: assert fobj.read() == "something" @@ -43,7 +43,7 @@ def test_open_dirty_no_hash(tmp_dir, dvc): (tmp_dir / "file.dvc").write_text("outs:\n- path: file\n") fs = RepoFileSystem(repo=dvc) - with fs.open((tmp_dir / "file").fs_path, "r") as fobj: + with fs.open("file", "r") as fobj: assert fobj.read() == "file" @@ -63,7 +63,7 @@ def test_open_in_history(tmp_dir, scm, dvc): continue fs = RepoFileSystem(repo=dvc) - with fs.open((tmp_dir / "foo").fs_path, "r") as fobj: + with fs.open("foo", "r") as fobj: assert fobj.read() == "foo" @@ -130,30 +130,30 @@ def test_exists_isdir_isfile_dirty(tmp_dir, dvc): (tmp_dir / "datafile").unlink() root = tmp_dir - assert fs.exists(root / "datafile") - assert fs.exists(root / "datadir") - assert fs.exists(root / "datadir" / "foo") - assert fs.isfile(root / "datafile") - assert not fs.isfile(root / "datadir") - assert fs.isfile(root / "datadir" / "foo") - assert not fs.isdir(root / "datafile") - assert fs.isdir(root / "datadir") - assert not fs.isdir(root / "datadir" / "foo") + assert fs.exists("datafile") + assert fs.exists("datadir") + assert fs.exists(os.path.join("datadir", "foo")) + assert fs.isfile("datafile") + assert not fs.isfile("datadir") + assert fs.isfile(os.path.join("datadir", "foo")) + assert not fs.isdir("datafile") + assert fs.isdir("datadir") + assert not fs.isdir(os.path.join("datadir", "foo")) # NOTE: creating file instead of dir and dir instead of file tmp_dir.gen({"datadir": "data", "datafile": {"foo": "foo", "bar": "bar"}}) - assert fs.exists(root / "datafile") - assert fs.exists(root / "datadir") - assert not fs.exists(root / "datadir" / "foo") - assert fs.exists(root / "datafile" / "foo") - assert not fs.isfile(root / "datafile") - assert fs.isfile(root / "datadir") - assert not fs.isfile(root / "datadir" / "foo") - assert fs.isfile(root / "datafile" / "foo") - assert fs.isdir(root / "datafile") - assert not fs.isdir(root / "datadir") - assert not fs.isdir(root / "datadir" / "foo") - assert not fs.isdir(root / "datafile" / "foo") + assert fs.exists("datafile") + assert fs.exists("datadir") + assert not fs.exists(os.path.join("datadir", "foo")) + assert fs.exists(os.path.join("datafile", "foo")) + assert not fs.isfile("datafile") + assert fs.isfile("datadir") + assert not fs.isfile(os.path.join("datadir", "foo")) + assert fs.isfile(os.path.join("datafile", "foo")) + assert fs.isdir("datafile") + assert not fs.isdir("datadir") + assert not fs.isdir(os.path.join("datadir", "foo")) + assert not fs.isdir(os.path.join("datafile", "foo")) def test_isdir_mixed(tmp_dir, dvc): @@ -257,11 +257,11 @@ def test_walk_dirty_cached_dir(tmp_dir, scm, dvc): data = tmp_dir / "data" actual = [] - for root, dirs, files in fs.walk(data): + for root, dirs, files in fs.walk("data"): for entry in dirs + files: actual.append(os.path.join(root, entry)) - expected = [(data / "foo").fs_path, (data / "bar").fs_path] + expected = [os.path.join("data", "foo"), os.path.join("data", "bar")] assert set(actual) == set(expected) assert len(actual) == len(expected) @@ -365,30 +365,30 @@ def f(*args, **kwargs): with mock.patch.object( fs, "_get_repo", side_effect=assert_fs_belongs_to_repo(subrepo1.dvc) ): - assert fs.exists((subrepo1 / "foo").fs_path) is True - assert fs.exists((subrepo1 / "bar").fs_path) is False + assert fs.exists(os.path.join("dir", "repo", "foo")) is True + assert fs.exists(os.path.join("dir", "repo", "bar")) is False - assert fs.isfile((subrepo1 / "foo").fs_path) is True - assert fs.isfile((subrepo1 / "dir1" / "bar").fs_path) is True - assert fs.isfile((subrepo1 / "dir1").fs_path) is False + assert fs.isfile(os.path.join("dir", "repo", "foo")) is True + assert fs.isfile(os.path.join("dir", "repo", "dir1", "bar")) is True + assert fs.isfile(os.path.join("dir", "repo", "dir1")) is False - assert fs.isdir((subrepo1 / "dir1").fs_path) is True - assert fs.isdir((subrepo1 / "dir1" / "bar").fs_path) is False - assert fs.isdvc((subrepo1 / "foo").fs_path) is True + assert fs.isdir(os.path.join("dir", "repo", "dir1")) is True + assert fs.isdir(os.path.join("dir", "repo", "dir1", "bar")) is False + assert fs.isdvc(os.path.join("dir", "repo", "foo")) is True with mock.patch.object( fs, "_get_repo", side_effect=assert_fs_belongs_to_repo(subrepo2.dvc) ): - assert fs.exists((subrepo2 / "lorem").fs_path) is True - assert fs.exists((subrepo2 / "ipsum").fs_path) is False + assert fs.exists(os.path.join("dir", "repo2", "lorem")) is True + assert fs.exists(os.path.join("dir", "repo2", "ipsum")) is False - assert fs.isfile((subrepo2 / "lorem").fs_path) is True - assert fs.isfile((subrepo2 / "dir2" / "ipsum").fs_path) is True - assert fs.isfile((subrepo2 / "dir2").fs_path) is False + assert fs.isfile(os.path.join("dir", "repo2", "lorem")) is True + assert fs.isfile(os.path.join("dir", "repo2", "dir2", "ipsum")) is True + assert fs.isfile(os.path.join("dir", "repo2", "dir2")) is False - assert fs.isdir((subrepo2 / "dir2").fs_path) is True - assert fs.isdir((subrepo2 / "dir2" / "ipsum").fs_path) is False - assert fs.isdvc((subrepo2 / "lorem").fs_path) is True + assert fs.isdir(os.path.join("dir", "repo2", "dir2")) is True + assert fs.isdir(os.path.join("dir", "repo2", "dir2", "ipsum")) is False + assert fs.isdvc(os.path.join("dir", "repo2", "lorem")) is True @pytest.mark.parametrize( @@ -447,16 +447,14 @@ def test_subrepo_walk(tmp_dir, scm, dvc, dvcfiles, extra_expected): actual = [] for root, dirs, files in fs.walk( - os.path.join(fs.root_dir, "dir"), + "dir", dvcfiles=dvcfiles, ignore_subrepos=False, ): for entry in dirs + files: actual.append(os.path.join(root, entry)) - expected = [ - os.path.join(fs.root_dir, path) for path in expected + extra_expected - ] + expected += extra_expected assert set(actual) == set(expected) assert len(actual) == len(expected) @@ -478,44 +476,43 @@ def test_repo_fs_no_subrepos(tmp_dir, dvc, scm): dvc._reset() fs = RepoFileSystem(repo=dvc) expected = [ - tmp_dir / ".dvcignore", - tmp_dir / ".gitignore", - tmp_dir / "lorem", - tmp_dir / "lorem.dvc", - tmp_dir / "dir", - tmp_dir / "dir" / "repo.txt", + ".dvcignore", + ".gitignore", + "lorem", + "lorem.dvc", + "dir", + os.path.join("dir", "repo.txt"), ] actual = [] - for root, dirs, files in fs.walk(tmp_dir.fs_path, dvcfiles=True): + for root, dirs, files in fs.walk("", dvcfiles=True): for entry in dirs + files: - actual.append(os.path.normpath(os.path.join(root, entry))) + actual.append(os.path.join(root, entry)) - expected = [str(path) for path in expected] assert set(actual) == set(expected) assert len(actual) == len(expected) - assert fs.isfile(tmp_dir / "lorem") is True - assert fs.isfile(tmp_dir / "dir" / "repo" / "foo") is False - assert fs.isdir(tmp_dir / "dir" / "repo") is False - assert fs.isdir(tmp_dir / "dir") is True + assert fs.isfile("lorem") is True + assert fs.isfile(os.path.join("dir", "repo", "foo")) is False + assert fs.isdir(os.path.join("dir", "repo")) is False + assert fs.isdir("dir") is True - assert fs.isdvc(tmp_dir / "lorem") is True - assert fs.isdvc(tmp_dir / "dir" / "repo" / "dir1") is False + assert fs.isdvc("lorem") is True + assert fs.isdvc(os.path.join("dir", "repo", "dir1")) is False - assert fs.exists(tmp_dir / "dir" / "repo.txt") is True - assert fs.exists(tmp_dir / "repo" / "ipsum") is False + assert fs.exists(os.path.join("dir", "repo.txt")) is True + assert fs.exists(os.path.join("repo", "ipsum")) is False def test_get_hash_cached_file(tmp_dir, dvc, mocker): tmp_dir.dvc_gen({"foo": "foo"}) fs = RepoFileSystem(repo=dvc) expected = "acbd18db4cc2f85cedef654fccc4a4d8" - assert fs.info((tmp_dir / "foo").fs_path).get("md5") is None - _, _, obj = stage(dvc.odb.local, (tmp_dir / "foo").fs_path, fs, "md5") + assert fs.info("foo").get("md5") is None + _, _, obj = stage(dvc.odb.local, "foo", fs, "md5") assert obj.hash_info == HashInfo("md5", expected) (tmp_dir / "foo").unlink() - assert fs.info((tmp_dir / "foo").fs_path)["md5"] == expected + assert fs.info("foo")["md5"] == expected def test_get_hash_cached_dir(tmp_dir, dvc, mocker): @@ -524,15 +521,15 @@ def test_get_hash_cached_dir(tmp_dir, dvc, mocker): ) fs = RepoFileSystem(repo=dvc) expected = "8761c4e9acad696bee718615e23e22db.dir" - assert fs.info((tmp_dir / "dir").fs_path).get("md5") is None - _, _, obj = stage(dvc.odb.local, (tmp_dir / "dir").fs_path, fs, "md5") + assert fs.info("dir").get("md5") is None + _, _, obj = stage(dvc.odb.local, "dir", fs, "md5") assert obj.hash_info == HashInfo( "md5", "8761c4e9acad696bee718615e23e22db.dir" ) shutil.rmtree(tmp_dir / "dir") - assert fs.info((tmp_dir / "dir").fs_path)["md5"] == expected - _, _, obj = stage(dvc.odb.local, (tmp_dir / "dir").fs_path, fs, "md5") + assert fs.info("dir")["md5"] == expected + _, _, obj = stage(dvc.odb.local, "dir", fs, "md5") assert obj.hash_info == HashInfo( "md5", "8761c4e9acad696bee718615e23e22db.dir" ) @@ -543,18 +540,18 @@ def test_get_hash_cached_granular(tmp_dir, dvc, mocker): {"dir": {"foo": "foo", "bar": "bar", "subdir": {"data": "data"}}} ) fs = RepoFileSystem(repo=dvc) - subdir = tmp_dir / "dir" / "subdir" - assert fs.info(subdir.fs_path).get("md5") is None - _, _, obj = stage(dvc.odb.local, subdir.fs_path, fs, "md5") + subdir = os.path.join("dir", "subdir") + assert fs.info(subdir).get("md5") is None + _, _, obj = stage(dvc.odb.local, subdir, fs, "md5") assert obj.hash_info == HashInfo( "md5", "af314506f1622d107e0ed3f14ec1a3b5.dir" ) - assert fs.info((subdir / "data").fs_path).get("md5") is None - _, _, obj = stage(dvc.odb.local, (subdir / "data").fs_path, fs, "md5") + assert fs.info(os.path.join(subdir, "data")).get("md5") is None + _, _, obj = stage(dvc.odb.local, os.path.join(subdir, "data"), fs, "md5") assert obj.hash_info == HashInfo("md5", "8d777f385d3dfec8815d20f7496026dc") (tmp_dir / "dir" / "subdir" / "data").unlink() assert ( - fs.info((subdir / "data").fs_path)["md5"] + fs.info(os.path.join(subdir, "data"))["md5"] == "8d777f385d3dfec8815d20f7496026dc" ) @@ -573,7 +570,7 @@ def test_get_hash_mixed_dir(tmp_dir, scm, dvc): clean_staging() fs = RepoFileSystem(repo=dvc) - _, _, obj = stage(dvc.odb.local, (tmp_dir / "dir").fs_path, fs, "md5") + _, _, obj = stage(dvc.odb.local, "dir", fs, "md5") assert obj.hash_info == HashInfo( "md5", "e1d9e8eae5374860ae025ec84cfd85c7.dir" ) @@ -595,9 +592,9 @@ def test_get_hash_dirty_file(tmp_dir, dvc): # file is modified in workspace # get_file_hash(file) should return workspace hash, not DVC cached hash fs = RepoFileSystem(repo=dvc) - assert fs.info((tmp_dir / "file").fs_path).get("md5") is None + assert fs.info("file").get("md5") is None staging, _, obj = stage( - dvc.odb.local, (tmp_dir / "file").fs_path, fs, "md5" + dvc.odb.local, "file", fs, "md5" ) assert obj.hash_info == something_hash_info check(staging, obj) @@ -609,15 +606,15 @@ def test_get_hash_dirty_file(tmp_dir, dvc): check(staging, obj) # get_file_hash(file) should return DVC cached hash - assert fs.info((tmp_dir / "file").fs_path)["md5"] == file_hash_info.value + assert fs.info("file")["md5"] == file_hash_info.value _, hash_info = get_file_hash( - (tmp_dir / "file").fs_path, fs, "md5", state=dvc.state + "file", fs, "md5", state=dvc.state ) assert hash_info == file_hash_info # tmp_dir/file can be staged even though it is missing in workspace since # repofs will use the DVC cached hash (and refer to the local cache object) - _, _, obj = stage(dvc.odb.local, (tmp_dir / "file").fs_path, fs, "md5") + _, _, obj = stage(dvc.odb.local, "file", fs, "md5") assert obj.hash_info == file_hash_info @@ -627,7 +624,7 @@ def test_get_hash_dirty_dir(tmp_dir, dvc): clean_staging() fs = RepoFileSystem(repo=dvc) - _, meta, obj = stage(dvc.odb.local, (tmp_dir / "dir").fs_path, fs, "md5") + _, meta, obj = stage(dvc.odb.local, "dir", fs, "md5") assert obj.hash_info == HashInfo( "md5", "ba75a2162ca9c29acecb7957105a0bc2.dir" ) @@ -665,23 +662,24 @@ def dvc_structure(suffix): repo_dir.dvc_gen(dvc_files, commit=f"dvc add in {repo_dir}") if traverse_subrepos or repo_dir == tmp_dir: - expected[str(repo_dir)] = set( + repo_dir_path = repo_dir.relative_to(tmp_dir).fs_path if repo_dir != tmp_dir else "" + expected[repo_dir_path] = set( scm_files.keys() | dvc_files.keys() | extras ) # files inside a dvc directory - expected[str(repo_dir / f"dvc-{base}")] = {f"ipsum-{base}"} + expected[os.path.join(repo_dir_path, f"dvc-{base}")] = {f"ipsum-{base}"} # files inside a git directory - expected[str(repo_dir / f"dir-{base}")] = {f"bar-{base}"} + expected[os.path.join(repo_dir_path, f"dir-{base}")] = {f"bar-{base}"} if traverse_subrepos: # update subrepos - expected[str(tmp_dir)].update(["subrepo1", "subrepo2"]) - expected[str(tmp_dir / "subrepo1")].add("subrepo3") + expected[""].update(["subrepo1", "subrepo2"]) + expected["subrepo1"].add("subrepo3") actual = {} fs = RepoFileSystem(repo=dvc) for root, dirs, files in fs.walk( - str(tmp_dir), ignore_subrepos=not traverse_subrepos + "", ignore_subrepos=not traverse_subrepos ): actual[root] = set(dirs + files) assert expected == actual diff --git a/tests/unit/fs/test_repo_info.py b/tests/unit/fs/test_repo_info.py index bc7b285d56..79af258bb8 100644 --- a/tests/unit/fs/test_repo_info.py +++ b/tests/unit/fs/test_repo_info.py @@ -61,7 +61,7 @@ def test_info_not_existing(repo_fs): def test_info_git_tracked_file(repo_fs, path): info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs.root_dir + assert info["repo"].root_dir == repo_fs._root_dir assert not info["isdvc"] assert info["type"] == "file" assert not info["isexec"] @@ -80,7 +80,7 @@ def test_info_git_tracked_file(repo_fs, path): def test_info_dvc_tracked_file(repo_fs, path): info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs.root_dir + assert info["repo"].root_dir == repo_fs._root_dir assert info["isdvc"] assert info["type"] == "file" assert not info["isexec"] @@ -90,7 +90,7 @@ def test_info_dvc_tracked_file(repo_fs, path): def test_info_git_only_dirs(repo_fs, path): info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs.root_dir + assert info["repo"].root_dir == repo_fs._root_dir assert not info["isdvc"] assert info["type"] == "directory" assert not info["isexec"] @@ -98,9 +98,9 @@ def test_info_git_only_dirs(repo_fs, path): @pytest.mark.parametrize("path", [".", "models"]) def test_info_git_dvc_mixed_dirs(repo_fs, path): - info = repo_fs.info(os.path.join(repo_fs.root_dir, path)) + info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs.root_dir + assert info["repo"].root_dir == repo_fs._root_dir assert not info["isdvc"] assert info["type"] == "directory" assert not info["isexec"] @@ -115,9 +115,9 @@ def test_info_git_dvc_mixed_dirs(repo_fs, path): ], ) def test_info_dvc_only_dirs(repo_fs, path): - info = repo_fs.info(os.path.join(repo_fs.root_dir, path)) + info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs.root_dir + assert info["repo"].root_dir == repo_fs._root_dir assert info["isdvc"] assert info["type"] == "directory" assert not info["isexec"] @@ -135,7 +135,7 @@ def test_info_on_subrepos(make_tmp_dir, tmp_dir, dvc, scm, repo_fs): os.path.join("subrepo", "foo"), os.path.join("subrepo", "foobar"), ]: - info = repo_fs.info(tmp_dir / path) + info = repo_fs.info(path) assert info["repo"].root_dir == str( subrepo ), f"repo root didn't match for {path}" diff --git a/tests/unit/test_external_repo.py b/tests/unit/test_external_repo.py index 10fc790b15..c4ff4d4646 100644 --- a/tests/unit/test_external_repo.py +++ b/tests/unit/test_external_repo.py @@ -28,7 +28,7 @@ def test_hook_is_called(tmp_dir, erepo_dir, mocker): with external_repo(str(erepo_dir)) as repo: spy = mocker.spy(repo.repo_fs, "repo_factory") - list(repo.repo_fs.walk(repo.root_dir, ignore_subrepos=False)) # drain + list(repo.repo_fs.walk("", ignore_subrepos=False)) # drain assert spy.call_count == len(subrepos) paths = [os.path.join(repo.root_dir, path) for path in subrepo_paths] @@ -65,7 +65,7 @@ def test_subrepo_is_constructed_properly( ) as repo: spy = mocker.spy(repo.repo_fs, "repo_factory") - list(repo.repo_fs.walk(repo.root_dir, ignore_subrepos=False)) # drain + list(repo.repo_fs.walk("", ignore_subrepos=False)) # drain assert spy.call_count == 1 subrepo = spy.spy_return From 99af74f75a0e296d2088cbda4bc486aec4230cdd Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Wed, 13 Apr 2022 18:41:57 +0300 Subject: [PATCH 7/8] repofs: rename _main_repo -> repo --- dvc/fs/repo.py | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/dvc/fs/repo.py b/dvc/fs/repo.py index 4230b4ddf9..0f9582f746 100644 --- a/dvc/fs/repo.py +++ b/dvc/fs/repo.py @@ -105,7 +105,7 @@ def __init__( else: self.repo_factory = repo_factory - self._main_repo = repo + self.repo = repo self.hash_jobs = repo.fs.hash_jobs self._root_dir: str = repo.root_dir self._traverse_subrepos = subrepos @@ -123,20 +123,20 @@ def __init__( @property def repo_url(self): - if self._main_repo is None: + if self.repo is None: return None - return self._main_repo.url + return self.repo.url @property def config(self): return { self.PARAM_REPO_URL: self.repo_url, self.PARAM_REPO_ROOT: self._root_dir, - self.PARAM_REV: getattr(self._main_repo.fs, "rev", None), + self.PARAM_REV: getattr(self.repo.fs, "rev", None), self.PARAM_CACHE_DIR: os.path.abspath( - self._main_repo.odb.local.cache_dir + self.repo.odb.local.cache_dir ), - self.PARAM_CACHE_TYPES: self._main_repo.odb.local.cache_types, + self.PARAM_CACHE_TYPES: self.repo.odb.local.cache_types, self.PARAM_SUBREPOS: self._traverse_subrepos, } @@ -202,13 +202,13 @@ def _get_repo(self, path: str) -> Optional["Repo"]: prefix, repo = self._subrepos_trie.longest_prefix(path) if not prefix: - return self._main_repo + return self.repo parents = (parent for parent in self.path.parents(path)) dirs = [path] + list(takewhile(lambda p: p != prefix, parents)) dirs.reverse() self._update(dirs, starting_repo=repo) - return self._subrepos_trie.get(path) or self._main_repo + return self._subrepos_trie.get(path) or self.repo @wrap_with(threading.Lock()) def _update(self, dirs, starting_repo): @@ -218,7 +218,7 @@ def _update(self, dirs, starting_repo): if self._is_dvc_repo(d): repo = self.repo_factory( d, - fs=self._main_repo.fs, + fs=self.repo.fs, repo_factory=self.repo_factory, ) self._dvcfss[repo.root_dir] = DvcFileSystem(repo=repo) @@ -232,7 +232,7 @@ def _is_dvc_repo(self, dir_path): from dvc.repo import Repo repo_path = os.path.join(dir_path, Repo.DVC_DIR) - return self._main_repo.fs.isdir(repo_path) + return self.repo.fs.isdir(repo_path) def _get_fs_pair( self, path @@ -241,19 +241,17 @@ def _get_fs_pair( Returns a pair of fss based on repo the path falls in, using prefix. """ if os.path.isabs(path): - return None, None, self._main_repo.dvcfs, path + return None, None, self.repo.dvcfs, path parts = self.path.parts(path) if parts and parts[0] == os.curdir: parts = parts[1:] - fs_path = self._main_repo.fs.path.join( - self._main_repo.root_dir, *parts - ) + fs_path = self.repo.fs.path.join(self.repo.root_dir, *parts) repo = self._get_repo(fs_path) fs = repo.fs - repo_parts = fs.path.relparts(repo.root_dir, self._main_repo.root_dir) + repo_parts = fs.path.relparts(repo.root_dir, self.repo.root_dir) if repo_parts[0] == os.curdir: repo_parts = repo_parts[1:] @@ -307,7 +305,7 @@ def isfile(self, path, **kwargs): # pylint: disable=arguments-renamed def ls(self, path, detail=True, **kwargs): fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - repo = dvc_fs.repo if dvc_fs else self._main_repo + repo = dvc_fs.repo if dvc_fs else self.repo dvcignore = repo.dvcignore ignore_subrepos = kwargs.get("ignore_subrepos", True) @@ -424,7 +422,7 @@ def get_file( def info(self, path, **kwargs): fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) - repo = dvc_fs.repo if dvc_fs else self._main_repo + repo = dvc_fs.repo if dvc_fs else self.repo dvcignore = repo.dvcignore ignore_subrepos = kwargs.get("ignore_subrepos", True) From 1ee5807adcac48d968c4146bc11455cecc7836ac Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Wed, 13 Apr 2022 21:15:49 +0300 Subject: [PATCH 8/8] repofs: migrate to fsspec --- dvc/api.py | 13 +- dvc/data/reference.py | 7 +- dvc/dependency/repo.py | 15 +-- dvc/fs/git.py | 4 +- dvc/fs/path.py | 2 - dvc/fs/repo.py | 199 ++++++++++++++---------------- dvc/ignore.py | 2 +- dvc/output.py | 5 - dvc/repo/__init__.py | 6 +- dvc/repo/collect.py | 28 ++--- dvc/repo/diff.py | 7 +- dvc/repo/ls.py | 45 ++++--- dvc/repo/metrics/show.py | 9 +- dvc/repo/plots/__init__.py | 8 +- dvc/repo/reproduce.py | 2 +- dvc/utils/__init__.py | 7 ++ tests/func/test_external_repo.py | 2 +- tests/unit/fs/test_repo.py | 204 ++++++++++++++++--------------- tests/unit/fs/test_repo_info.py | 54 ++++---- tests/unit/test_external_repo.py | 6 +- 20 files changed, 300 insertions(+), 325 deletions(-) diff --git a/dvc/api.py b/dvc/api.py index 28b5c04384..9db246ad38 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -22,17 +22,14 @@ def get_url(path, repo=None, rev=None, remote=None): with reraise(FileNotFoundError, PathMissingError(path, repo)): info = _repo.repo_fs.info(path) - if not info["isdvc"]: + dvc_info = info.get("dvc_info") + if not dvc_info: raise OutputNotFoundError(path, repo) - cloud = info["repo"].cloud - _, _, dvc_fs, dvc_path = _repo.repo_fs._get_fs_pair(path) + dvc_repo = info["repo"] + md5 = dvc_info["md5"] - if not os.path.isabs(path): - dvc_path = dvc_path.replace("\\", "/") - - md5 = info["repo"].dvcfs.info(dvc_path)["md5"] - return cloud.get_url_for(remote, checksum=md5) + return dvc_repo.cloud.get_url_for(remote, checksum=md5) def open( # noqa, pylint: disable=redefined-builtin diff --git a/dvc/data/reference.py b/dvc/data/reference.py index 0cd28274e3..ec391bf689 100644 --- a/dvc/data/reference.py +++ b/dvc/data/reference.py @@ -30,7 +30,6 @@ def __init__( checksum: Optional[str] = None, **kwargs, ): - from dvc.fs.repo import RepoFileSystem super().__init__(fs_path, fs, hash_info, **kwargs) self.checksum = checksum or fs.checksum(fs_path) @@ -53,8 +52,6 @@ def _get_checksum(self) -> str: return self.fs.checksum(self.fs_path) def to_bytes(self): - from dvc.fs.repo import RepoFileSystem - # NOTE: dumping reference FS's this way is insecure, as the # fully parsed remote FS config will include credentials # @@ -75,7 +72,7 @@ def to_bytes(self): @classmethod def from_bytes(cls, data: bytes, fs_cache: Optional[dict] = None): from dvc.fs import get_fs_cls - from dvc.fs.repo import RepoFileSystem + from dvc.fs.repo import RepoFileSystem, _RepoFileSystem try: dict_ = pickle.loads(data) @@ -92,7 +89,7 @@ def from_bytes(cls, data: bytes, fs_cache: Optional[dict] = None): fs = fs_cache.get((scheme, config_pairs)) if fs_cache else None if not fs: config = dict(config_pairs) - if RepoFileSystem.PARAM_REPO_URL in config: + if _RepoFileSystem.PARAM_REPO_URL in config: fs_cls = RepoFileSystem else: fs_cls = get_fs_cls(config, scheme=scheme) diff --git a/dvc/dependency/repo.py b/dvc/dependency/repo.py index e4dfec3574..45111bb456 100644 --- a/dvc/dependency/repo.py +++ b/dvc/dependency/repo.py @@ -101,6 +101,7 @@ def _get_used_and_obj( from dvc.data.stage import stage from dvc.data.tree import Tree, TreeError from dvc.exceptions import NoOutputOrStageError, PathMissingError + from dvc.utils import as_posix local_odb = self.repo.odb.local locked = kwargs.pop("locked", True) @@ -115,11 +116,7 @@ def _get_used_and_obj( if not obj_only: try: for odb, obj_ids in repo.used_objs( - [ - os.path.abspath( - os.path.join(repo.root_dir, self.def_path) - ) - ], + [os.path.join(repo.root_dir, self.def_path)], force=True, jobs=kwargs.get("jobs"), recursive=True, @@ -135,7 +132,7 @@ def _get_used_and_obj( try: staging, _, staged_obj = stage( local_odb, - self.def_path, + as_posix(self.def_path), repo.repo_fs, local_odb.fs.PARAM_CHECKSUM, ) @@ -175,17 +172,17 @@ def iter_objs(): continue if ( obj.fs.repo_url in checked_urls - or obj.fs._root_dir in checked_urls + or obj.fs.repo.root_dir in checked_urls ): continue self_url = self.repo.url or self.repo.root_dir if ( obj.fs.repo_url is not None and obj.fs.repo_url == self_url - or obj.fs._root_dir == self.repo.root_dir + or obj.fs.repo.root_dir == self.repo.root_dir ): raise CircularImportError(self, obj.fs.repo_url, self_url) - checked_urls.update([obj.fs.repo_url, obj.fs._root_dir]) + checked_urls.update([obj.fs.repo_url, obj.fs.repo.root_dir]) def get_obj(self, filter_info=None, **kwargs): locked = kwargs.get("locked", True) diff --git a/dvc/fs/git.py b/dvc/fs/git.py index 13c43d5547..314513a0bd 100644 --- a/dvc/fs/git.py +++ b/dvc/fs/git.py @@ -51,5 +51,5 @@ def fs(self) -> "FsspecGitFileSystem": def rev(self) -> str: return self.fs.rev - def ls(self, path, **kwargs): - return self.fs.ls(path, **kwargs) or [] + def ls(self, path, detail=True, **kwargs): + return self.fs.ls(path, detail=detail, **kwargs) or [] diff --git a/dvc/fs/path.py b/dvc/fs/path.py index c06dc2a59f..fdbdb3e4b9 100644 --- a/dvc/fs/path.py +++ b/dvc/fs/path.py @@ -81,8 +81,6 @@ def overlaps(self, left, right): return self.isin_or_eq(left, right) or self.isin(right, left) def relpath(self, path, start): - if not start: - return path return self.flavour.relpath(path, start=start) def relparts(self, path, base): diff --git a/dvc/fs/repo.py b/dvc/fs/repo.py index 0f9582f746..1de35f3aa1 100644 --- a/dvc/fs/repo.py +++ b/dvc/fs/repo.py @@ -5,11 +5,14 @@ from itertools import takewhile from typing import TYPE_CHECKING, Callable, Optional, Tuple, Type, Union -from funcy import wrap_with +from fsspec.spec import AbstractFileSystem +from funcy import cached_property, wrap_prop, wrap_with from ._callback import DEFAULT_CALLBACK from .base import FileSystem from .dvc import DvcFileSystem +from .fsspec_wrapper import FSSpecWrapper +from .path import Path if TYPE_CHECKING: from dvc.repo import Repo @@ -41,27 +44,27 @@ def _ls(fs, path): def _merge_info(repo, fs_info, dvc_info): from dvc.utils import is_exec - if not fs_info: - dvc_info["repo"] = repo - dvc_info["isdvc"] = True - return dvc_info + ret = {"repo": repo} - fs_info["repo"] = repo - fs_info["isout"] = dvc_info.get("isout", False) if dvc_info else False - fs_info["outs"] = dvc_info["outs"] if dvc_info else None - fs_info["isdvc"] = dvc_info["isdvc"] if dvc_info else False - fs_info["meta"] = dvc_info.get("meta") if dvc_info else None - - isexec = False if dvc_info: - isexec = dvc_info["isexec"] - elif fs_info["type"] == "file": - isexec = is_exec(fs_info["mode"]) - fs_info["isexec"] = isexec - return fs_info + ret["dvc_info"] = dvc_info + ret["type"] = dvc_info["type"] + ret["size"] = dvc_info["size"] + if not fs_info and "md5" in dvc_info: + ret["md5"] = dvc_info["md5"] + + if fs_info: + ret["type"] = fs_info["type"] + ret["size"] = fs_info["size"] + isexec = False + if fs_info["type"] == "file": + isexec = is_exec(fs_info["mode"]) + ret["isexec"] = isexec + return ret -class RepoFileSystem(FileSystem): # pylint:disable=abstract-method + +class _RepoFileSystem(AbstractFileSystem): # pylint:disable=abstract-method """DVC + git-tracked files fs. Args: @@ -71,10 +74,6 @@ class RepoFileSystem(FileSystem): # pylint:disable=abstract-method kwargs: Additional keyword arguments passed to the `DvcFileSystem()`. """ - sep = os.sep - - scheme = "local" - PARAM_CHECKSUM = "md5" PARAM_REPO_URL = "repo_url" PARAM_REPO_ROOT = "repo_root" PARAM_REV = "rev" @@ -105,6 +104,7 @@ def __init__( else: self.repo_factory = repo_factory + self.path = Path(self.sep) self.repo = repo self.hash_jobs = repo.fs.hash_jobs self._root_dir: str = repo.root_dir @@ -131,7 +131,7 @@ def repo_url(self): def config(self): return { self.PARAM_REPO_URL: self.repo_url, - self.PARAM_REPO_ROOT: self._root_dir, + self.PARAM_REPO_ROOT: self.repo.root_dir, self.PARAM_REV: getattr(self.repo.fs, "rev", None), self.PARAM_CACHE_DIR: os.path.abspath( self.repo.odb.local.cache_dir @@ -188,7 +188,7 @@ def _open(*args, **kwargs): ) as repo: return repo, factory - def _get_repo(self, path: str) -> Optional["Repo"]: + def _get_repo(self, path: str) -> "Repo": """Returns repo that the path falls in, using prefix. If the path is already tracked/collected, it just returns the repo. @@ -204,7 +204,7 @@ def _get_repo(self, path: str) -> Optional["Repo"]: if not prefix: return self.repo - parents = (parent for parent in self.path.parents(path)) + parents = (parent for parent in self.repo.fs.path.parents(path)) dirs = [path] + list(takewhile(lambda p: p != prefix, parents)) dirs.reverse() self._update(dirs, starting_repo=repo) @@ -236,12 +236,24 @@ def _is_dvc_repo(self, dir_path): def _get_fs_pair( self, path - ) -> Tuple[FileSystem, Optional[DvcFileSystem], str]: + ) -> Tuple[ + Optional[FileSystem], + Optional[str], + Optional[DvcFileSystem], + Optional[str], + ]: """ Returns a pair of fss based on repo the path falls in, using prefix. """ + from dvc.utils import as_posix + if os.path.isabs(path): - return None, None, self.repo.dvcfs, path + if self.repo.fs.path.isin_or_eq(path, self.repo.root_dir): + path = self.repo.fs.path.relpath(path, self.repo.root_dir) + else: + return None, None, self.repo.dvcfs, path + + path = as_posix(path) parts = self.path.parts(path) if parts and parts[0] == os.curdir: @@ -260,13 +272,16 @@ def _get_fs_pair( dvc_parts = dvc_parts[1:] dvc_fs = self._dvcfss.get(repo.root_dir) - dvc_path = dvc_fs.path.join(*dvc_parts) if dvc_parts else "" + if dvc_fs: + dvc_path = dvc_fs.path.join(*dvc_parts) if dvc_parts else "" + else: + dvc_path = None return fs, fs_path, dvc_fs, dvc_path def open( self, path, mode="r", encoding="utf-8", **kwargs - ): # pylint: disable=arguments-renamed + ): # pylint: disable=arguments-renamed, arguments-differ if "b" in mode: encoding = None @@ -279,29 +294,10 @@ def open( return dvc_fs.open(dvc_path, mode=mode, encoding=encoding, **kwargs) - def exists(self, path, **kwargs) -> bool: - try: - self.info(path, **kwargs) - return True - except FileNotFoundError: - return False - - def isdir(self, path, **kwargs): # pylint: disable=arguments-renamed - try: - return self.info(path, **kwargs)["type"] == "directory" - except FileNotFoundError: - return False - def isdvc(self, path, **kwargs): _, _, dvc_fs, dvc_path = self._get_fs_pair(path) return dvc_fs is not None and dvc_fs.isdvc(dvc_path, **kwargs) - def isfile(self, path, **kwargs): # pylint: disable=arguments-renamed - try: - return self.info(path, **kwargs)["type"] == "file" - except FileNotFoundError: - return False - def ls(self, path, detail=True, **kwargs): fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) @@ -315,13 +311,14 @@ def ls(self, path, detail=True, **kwargs): for entry in dvc_fs.ls(dvc_path, detail=False): names.add(dvc_fs.path.name(entry)) - try: - for entry in dvcignore.ls( - fs, fs_path, detail=False, ignore_subrepos=ignore_subrepos - ): - names.add(fs.path.name(entry)) - except (FileNotFoundError, NotADirectoryError): - pass + if fs: + try: + for entry in dvcignore.ls( + fs, fs_path, detail=False, ignore_subrepos=ignore_subrepos + ): + names.add(fs.path.name(entry)) + except (FileNotFoundError, NotADirectoryError): + pass dvcfiles = kwargs.get("dvcfiles", False) @@ -354,61 +351,13 @@ def _func(fname): return infos - def walk(self, top, topdown=True, **kwargs): - """Walk and merge both DVC and repo fss. - - Args: - top: path to walk from - topdown: if True, fs will be walked from top down. - dvcfiles: if True, dvcfiles will be included in the files list - for walked directories. - - Any kwargs will be passed into methods used for fetching and/or - streaming DVC outs from remotes. - """ - dirs = [] - nondirs = [] - - ignore_subrepos = kwargs.get("ignore_subrepos", True) - - if not self.exists(top, ignore_subrepos=ignore_subrepos): - return - - if not self.isdir(top, ignore_subrepos=ignore_subrepos): - return - - for entry in self.ls(top, **kwargs): - name = self.path.name(entry["name"]) - if entry["type"] == "directory": - dirs.append(name) - else: - nondirs.append(name) - - if topdown: - yield top, dirs, nondirs - - for dname in dirs: - yield from self.walk( - self.path.join(top, dname), topdown=topdown, **kwargs - ) - - if not topdown: - yield top, dirs, nondirs - - def find(self, path, prefix=None): - for root, _, files in self.walk(path): - for fname in files: - yield self.path.join(root, fname) - - def get_file( - self, from_info, to_file, callback=DEFAULT_CALLBACK, **kwargs - ): - fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(from_info) + def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, **kwargs): + fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(rpath) if fs: try: fs.get_file( # pylint: disable=protected-access - fs_path, to_file, callback=callback, **kwargs + fs_path, lpath, callback=callback, **kwargs ) return except FileNotFoundError: @@ -416,7 +365,7 @@ def get_file( raise dvc_fs.get_file( # pylint: disable=protected-access - dvc_path, to_file, callback=callback, **kwargs + dvc_path, lpath, callback=callback, **kwargs ) def info(self, path, **kwargs): @@ -446,9 +395,9 @@ def info(self, path, **kwargs): raise # NOTE: if some parent in fs_path turns out to be a file, it means - # that that whole repofs branch doesn't exist. + # that the whole repofs branch doesn't exist. if fs and not fs_info and dvc_info: - for parent in self.path.parents(fs_path): + for parent in fs.path.parents(fs_path): try: if fs.info(parent)["type"] != "directory": dvc_info = None @@ -459,7 +408,9 @@ def info(self, path, **kwargs): if not dvc_info and not fs_info: raise FileNotFoundError - return _merge_info(dvc_fs.repo, fs_info, dvc_info) + info = _merge_info(dvc_fs.repo, fs_info, dvc_info) + info["name"] = path + return info def checksum(self, path): fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(path) @@ -468,3 +419,31 @@ def checksum(self, path): return fs.checksum(fs_path) except FileNotFoundError: return dvc_fs.checksum(dvc_path) + + +class RepoFileSystem(FSSpecWrapper): + scheme = "local" + PARAM_CHECKSUM = "md5" + + def _prepare_credentials(self, **config): + return config + + @wrap_prop(threading.Lock()) + @cached_property + def fs(self): + return _RepoFileSystem(**self.fs_args) + + def isdvc(self, path, **kwargs): + return self.fs.isdvc(path, **kwargs) + + @property + def repo(self): + return self.fs.repo + + @property + def repo_url(self): + return self.fs.repo_url + + @property + def config(self): + return self.fs.config diff --git a/dvc/ignore.py b/dvc/ignore.py index eaff9768d1..0e81482390 100644 --- a/dvc/ignore.py +++ b/dvc/ignore.py @@ -303,7 +303,7 @@ def find(self, fs: FileSystem, path: AnyPath, **kwargs): for root, _, files in self.walk(fs, path, **kwargs): for file in files: # NOTE: os.path.join is ~5.5 times slower - yield f"{root}{os.sep}{file}" + yield f"{root}{fs.sep}{file}" else: yield from fs.find(path) diff --git a/dvc/output.py b/dvc/output.py index d4bc274326..a532f98603 100644 --- a/dvc/output.py +++ b/dvc/output.py @@ -386,11 +386,6 @@ def is_in_repo(self): os.path.realpath(self.fs_path), self.repo.root_dir ) - @property - def repo_path(self): - assert self.is_in_repo - return relpath(self.fs_path, self.repo.root_dir) - @property def use_scm_ignore(self): if not self.is_in_repo: diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 7147b1abc3..dbbf08e088 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -482,7 +482,9 @@ def dvcfs(self): def repo_fs(self): from dvc.fs.repo import RepoFileSystem - return RepoFileSystem(self, subrepos=self.subrepos, **self._fs_conf) + return RepoFileSystem( + repo=self, subrepos=self.subrepos, **self._fs_conf + ) @cached_property def index_db_dir(self): @@ -493,7 +495,7 @@ def open_by_relpath(self, path, remote=None, mode="r", encoding=None): """Opens a specified resource as a file descriptor""" from dvc.fs.repo import RepoFileSystem - fs = RepoFileSystem(self, subrepos=True) + fs = RepoFileSystem(repo=self, subrepos=True) try: with fs.open( path, mode=mode, encoding=encoding, remote=remote diff --git a/dvc/repo/collect.py b/dvc/repo/collect.py index c942bf9e92..58ba06e613 100644 --- a/dvc/repo/collect.py +++ b/dvc/repo/collect.py @@ -34,22 +34,22 @@ def _collect_paths( from dvc.fs.repo import RepoFileSystem from dvc.utils import relpath - fs = RepoFileSystem(repo) + fs_paths = [os.path.abspath(target) for target in targets] + fs = RepoFileSystem(repo=repo) target_paths = [] - for target in targets: - if os.path.isabs(target): - target = relpath(target, repo.root_dir) + for fs_path in fs_paths: + if recursive and fs.isdir(fs_path): + target_paths.extend(fs.find(fs_path)) - if recursive and fs.isdir(target): - target_paths.extend(repo.dvcignore.find(fs, target)) - - if not fs.exists(target): + rel = relpath(fs_path) + if not fs.exists(fs_path): if rev == "workspace" or rev == "": - logger.warning("'%s' was not found in current workspace.", target) + logger.warning("'%s' was not found in current workspace.", rel) else: - logger.warning("'%s' was not found at: '%s'.", target, rev) - target_paths.append(target) + logger.warning("'%s' was not found at: '%s'.", rel, rev) + target_paths.append(fs_path) + return target_paths @@ -58,14 +58,12 @@ def _filter_duplicates( ) -> Tuple[Outputs, StrPaths]: res_outs: Outputs = [] fs_res_paths = fs_paths - from dvc.utils import relpath for out in outs: - rel = relpath(out.fs_path, out.stage.repo.root_dir) - if rel in fs_paths: + if out.fs_path in fs_paths: res_outs.append(out) # MUTATING THE SAME LIST!! - fs_res_paths.remove(rel) + fs_res_paths.remove(out.fs_path) return res_outs, fs_res_paths diff --git a/dvc/repo/diff.py b/dvc/repo/diff.py index f6b6920fc7..44aa3dd3e6 100644 --- a/dvc/repo/diff.py +++ b/dvc/repo/diff.py @@ -24,7 +24,7 @@ def diff(self, a_rev="HEAD", b_rev=None, targets=None): from dvc.fs.repo import RepoFileSystem - repo_fs = RepoFileSystem(self) + repo_fs = RepoFileSystem(repo=self) b_rev = b_rev if b_rev else "workspace" results = {} @@ -188,10 +188,11 @@ def _filter_missing(repo_fs, paths): for path in paths: try: info = repo_fs.info(path) + dvc_info = info.get("dvc_info") if ( - info["isdvc"] + dvc_info and info["type"] == "directory" - and not info["meta"].obj + and not dvc_info["meta"].obj ): yield path except FileNotFoundError: diff --git a/dvc/repo/ls.py b/dvc/repo/ls.py index eba585578b..bed3f27130 100644 --- a/dvc/repo/ls.py +++ b/dvc/repo/ls.py @@ -29,9 +29,9 @@ def ls(url, path=None, rev=None, recursive=None, dvc_only=False): from . import Repo with Repo.open(url, rev=rev, subrepos=True, uninitialized=True) as repo: - fs_path = path or "" + path = path or "" - ret = _ls(repo, fs_path, recursive, dvc_only) + ret = _ls(repo.repo_fs, path, recursive, dvc_only) if path and not ret: raise PathMissingError(path, repo, dvc_only=dvc_only) @@ -44,31 +44,36 @@ def ls(url, path=None, rev=None, recursive=None, dvc_only=False): return ret_list -def _ls(repo, fs_path, recursive=None, dvc_only=False): - fs = repo.repo_fs - infos = [] +def _ls(fs, path, recursive=None, dvc_only=False): + try: + fs_path = fs.info(path)["name"] + except FileNotFoundError: + return {} + + infos = {} for root, dirs, files in fs.walk(fs_path, dvcfiles=True): entries = chain(files, dirs) if not recursive else files - infos.extend(fs.path.join(root, entry) for entry in entries) + + for entry in entries: + entry_fs_path = fs.path.join(root, entry) + relparts = fs.path.relparts(entry_fs_path, fs_path) + name = os.path.join(*relparts) + infos[name] = fs.info(entry_fs_path) + if not recursive: break if not infos and fs.isfile(fs_path): - infos.append(fs_path) + infos[os.path.basename(path)] = fs.info(fs_path) ret = {} - for info in infos: - _info = fs.info(info) - - if _info.get("outs") or not dvc_only: - path = ( - fs.path.name(fs_path) - if fs_path == info - else fs.path.relpath(info, fs_path) - ) - ret[path] = { - "isout": _info.get("isout", False), - "isdir": _info["type"] == "directory", - "isexec": _info["isexec"], + for name, info in infos.items(): + dvc_info = info.get("dvc_info", {}) + if dvc_info.get("outs") or not dvc_only: + ret[name] = { + "isout": dvc_info.get("isout", False), + "isdir": info["type"] == "directory", + "isexec": info.get("isexec", False), } + return ret diff --git a/dvc/repo/metrics/show.py b/dvc/repo/metrics/show.py index 784faacf9b..697048b984 100644 --- a/dvc/repo/metrics/show.py +++ b/dvc/repo/metrics/show.py @@ -76,16 +76,15 @@ def _read_metric(path, fs, rev, **kwargs): def _read_metrics(repo, metrics, rev, onerror=None): - fs = RepoFileSystem(repo) + fs = RepoFileSystem(repo=repo) res = {} for metric in metrics: - fs_path = repo.fs.path.relpath(metric, repo.root_dir) - if not fs.isfile(fs_path): + if not fs.isfile(metric): continue - res[fs.path.relpath(metric, os.getcwd())] = _read_metric( - fs_path, fs, rev, onerror=onerror + res[os.path.relpath(metric, os.getcwd())] = _read_metric( + metric, fs, rev, onerror=onerror ) return res diff --git a/dvc/repo/plots/__init__.py b/dvc/repo/plots/__init__.py index ef684d6ce2..7e3118f4dc 100644 --- a/dvc/repo/plots/__init__.py +++ b/dvc/repo/plots/__init__.py @@ -106,7 +106,7 @@ def _collect_from_revision( ): from dvc.fs.repo import RepoFileSystem - fs = RepoFileSystem(self.repo) + fs = RepoFileSystem(repo=self.repo) plots = _collect_plots(self.repo, targets, revision, recursive) res: Dict[str, Any] = {} for fs_path, rev_props in plots.items(): @@ -240,7 +240,7 @@ def _collect_plots( targets: List[str] = None, rev: str = None, recursive: bool = False, -) -> Dict["List[str]", Dict]: +) -> Dict[str, Dict]: from dvc.repo.collect import collect plots, fs_paths = collect( @@ -251,8 +251,8 @@ def _collect_plots( recursive=recursive, ) - result = {relpath(plot.fs_path, repo.root_dir): _plot_props(plot) for plot in plots} - result.update({relpath(fs_path, repo.root_dir): {} for fs_path in fs_paths}) + result = {plot.fs_path: _plot_props(plot) for plot in plots} + result.update({fs_path: {} for fs_path in fs_paths}) return result diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index c57b3000fc..d4fa759ba4 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -63,7 +63,7 @@ def _get_stage_files(stage: "Stage") -> typing.Iterator[str]: if ( not dep.use_scm_ignore and dep.is_in_repo - and not stage.repo.repo_fs.isdvc(dep.fs.path.relpath(dep.fs_path, stage.repo.root_dir)) + and not stage.repo.repo_fs.isdvc(dep.fs_path) ): yield dep.fs_path for out in stage.outs: diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index b23882876c..05f1ec9a33 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -332,6 +332,13 @@ def resolve_network_drive_windows(path_to_resolve): return os.path.relpath(path, start) +def as_posix(path): + import ntpath + import posixpath + + return path.replace(ntpath.sep, posixpath.sep) + + def env2bool(var, undefined=False): """ undefined: return value if env var is unset diff --git a/tests/func/test_external_repo.py b/tests/func/test_external_repo.py index ae51096e1c..3a5cb2a523 100644 --- a/tests/func/test_external_repo.py +++ b/tests/func/test_external_repo.py @@ -96,7 +96,7 @@ def test_pull_subdir_file(tmp_dir, erepo_dir): dest = tmp_dir / "file" with external_repo(os.fspath(erepo_dir)) as repo: repo.repo_fs.download( - os.path.join("subdir", "file"), + "subdir/file", os.fspath(dest), ) diff --git a/tests/unit/fs/test_repo.py b/tests/unit/fs/test_repo.py index ea3f39600e..70b9e84645 100644 --- a/tests/unit/fs/test_repo.py +++ b/tests/unit/fs/test_repo.py @@ -1,4 +1,5 @@ import os +import posixpath import shutil from unittest import mock @@ -116,8 +117,8 @@ def test_isdir_isfile(tmp_dir, dvc): assert fs.isdir("subdir") assert not fs.isfile("subdir") assert not fs.isdvc("subdir") - assert fs.isfile(os.path.join("subdir", "baz")) - assert fs.isdir(os.path.join("subdir", "data")) + assert fs.isfile("subdir/baz") + assert fs.isdir("subdir/data") def test_exists_isdir_isfile_dirty(tmp_dir, dvc): @@ -129,31 +130,30 @@ def test_exists_isdir_isfile_dirty(tmp_dir, dvc): shutil.rmtree(tmp_dir / "datadir") (tmp_dir / "datafile").unlink() - root = tmp_dir assert fs.exists("datafile") assert fs.exists("datadir") - assert fs.exists(os.path.join("datadir", "foo")) + assert fs.exists("datadir/foo") assert fs.isfile("datafile") assert not fs.isfile("datadir") - assert fs.isfile(os.path.join("datadir", "foo")) + assert fs.isfile("datadir/foo") assert not fs.isdir("datafile") assert fs.isdir("datadir") - assert not fs.isdir(os.path.join("datadir", "foo")) + assert not fs.isdir("datadir/foo") # NOTE: creating file instead of dir and dir instead of file tmp_dir.gen({"datadir": "data", "datafile": {"foo": "foo", "bar": "bar"}}) assert fs.exists("datafile") assert fs.exists("datadir") - assert not fs.exists(os.path.join("datadir", "foo")) - assert fs.exists(os.path.join("datafile", "foo")) + assert not fs.exists("datadir/foo") + assert fs.exists("datafile/foo") assert not fs.isfile("datafile") assert fs.isfile("datadir") - assert not fs.isfile(os.path.join("datadir", "foo")) - assert fs.isfile(os.path.join("datafile", "foo")) + assert not fs.isfile("datadir/foo") + assert fs.isfile("datafile/foo") assert fs.isdir("datafile") assert not fs.isdir("datadir") - assert not fs.isdir(os.path.join("datadir", "foo")) - assert not fs.isdir(os.path.join("datafile", "foo")) + assert not fs.isdir("datadir/foo") + assert not fs.isdir("datafile/foo") def test_isdir_mixed(tmp_dir, dvc): @@ -173,9 +173,9 @@ def test_isdir_mixed(tmp_dir, dvc): ( True, [ - os.path.join("dir", "subdir1", "foo1.dvc"), - os.path.join("dir", "subdir1", "bar1.dvc"), - os.path.join("dir", "subdir2", "foo2.dvc"), + "dir/subdir1/foo1.dvc", + "dir/subdir1/bar1.dvc", + "dir/subdir2/foo2.dvc", ], ), ], @@ -194,21 +194,21 @@ def test_walk(tmp_dir, dvc, dvcfiles, extra_expected): fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join("dir", "subdir1"), - os.path.join("dir", "subdir2"), - os.path.join("dir", "subdir1", "foo1"), - os.path.join("dir", "subdir1", "bar1"), - os.path.join("dir", "subdir2", "foo2"), - os.path.join("dir", "foo"), - os.path.join("dir", "bar"), + "dir/subdir1", + "dir/subdir2", + "dir/subdir1/foo1", + "dir/subdir1/bar1", + "dir/subdir2/foo2", + "dir/foo", + "dir/bar", ] actual = [] for root, dirs, files in fs.walk("dir", dvcfiles=dvcfiles): for entry in dirs + files: - actual.append(os.path.join(root, entry)) + actual.append(posixpath.join(root, entry)) - expected += [os.path.join(path) for path in extra_expected] + expected += extra_expected assert set(actual) == set(expected) assert len(actual) == len(expected) @@ -228,21 +228,21 @@ def test_walk_dirty(tmp_dir, dvc): fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join("dir", "subdir1"), - os.path.join("dir", "subdir2"), - os.path.join("dir", "subdir3"), - os.path.join("dir", "subdir1", "foo1"), - os.path.join("dir", "subdir1", "bar1"), - os.path.join("dir", "subdir2", "foo2"), - os.path.join("dir", "subdir3", "foo3"), - os.path.join("dir", "bar"), - os.path.join("dir", "foo"), + "dir/subdir1", + "dir/subdir2", + "dir/subdir3", + "dir/subdir1/foo1", + "dir/subdir1/bar1", + "dir/subdir2/foo2", + "dir/subdir3/foo3", + "dir/bar", + "dir/foo", ] actual = [] for root, dirs, files in fs.walk("dir"): for entry in dirs + files: - actual.append(os.path.join(root, entry)) + actual.append(posixpath.join(root, entry)) assert set(actual) == set(expected) assert len(actual) == len(expected) @@ -254,14 +254,12 @@ def test_walk_dirty_cached_dir(tmp_dir, scm, dvc): fs = RepoFileSystem(repo=dvc) - data = tmp_dir / "data" - actual = [] for root, dirs, files in fs.walk("data"): for entry in dirs + files: - actual.append(os.path.join(root, entry)) + actual.append(posixpath.join(root, entry)) - expected = [os.path.join("data", "foo"), os.path.join("data", "bar")] + expected = ["data/foo", "data/bar"] assert set(actual) == set(expected) assert len(actual) == len(expected) @@ -281,14 +279,14 @@ def test_walk_mixed_dir(tmp_dir, scm, dvc): fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join("dir", "foo"), - os.path.join("dir", "bar"), - os.path.join("dir", ".gitignore"), + "dir/foo", + "dir/bar", + "dir/.gitignore", ] actual = [] for root, dirs, files in fs.walk("dir"): for entry in dirs + files: - actual.append(os.path.join(root, entry)) + actual.append(posixpath.join(root, entry)) assert set(actual) == set(expected) assert len(actual) == len(expected) @@ -317,8 +315,8 @@ def test_isdvc(tmp_dir, dvc): assert fs.isdvc("foo") assert not fs.isdvc("bar") assert fs.isdvc("dir") - assert fs.isdvc(os.path.join("dir", "baz")) - assert fs.isdvc(os.path.join("dir", "baz"), recursive=True) + assert fs.isdvc("dir/baz") + assert fs.isdvc("dir/baz", recursive=True) def make_subrepo(dir_, scm, config=None): @@ -353,7 +351,7 @@ def test_subrepos(tmp_dir, scm, dvc, mocker): fs = RepoFileSystem(repo=dvc, subrepos=True) def assert_fs_belongs_to_repo(ret_val): - method = fs._get_repo + method = fs.fs._get_repo def f(*args, **kwargs): r = method(*args, **kwargs) @@ -363,32 +361,32 @@ def f(*args, **kwargs): return f with mock.patch.object( - fs, "_get_repo", side_effect=assert_fs_belongs_to_repo(subrepo1.dvc) + fs.fs, "_get_repo", side_effect=assert_fs_belongs_to_repo(subrepo1.dvc) ): - assert fs.exists(os.path.join("dir", "repo", "foo")) is True - assert fs.exists(os.path.join("dir", "repo", "bar")) is False + assert fs.exists("dir/repo/foo") is True + assert fs.exists("dir/repo/bar") is False - assert fs.isfile(os.path.join("dir", "repo", "foo")) is True - assert fs.isfile(os.path.join("dir", "repo", "dir1", "bar")) is True - assert fs.isfile(os.path.join("dir", "repo", "dir1")) is False + assert fs.isfile("dir/repo/foo") is True + assert fs.isfile("dir/repo/dir1/bar") is True + assert fs.isfile("dir/repo/dir1") is False - assert fs.isdir(os.path.join("dir", "repo", "dir1")) is True - assert fs.isdir(os.path.join("dir", "repo", "dir1", "bar")) is False - assert fs.isdvc(os.path.join("dir", "repo", "foo")) is True + assert fs.isdir("dir/repo/dir1") is True + assert fs.isdir("dir/repo/dir1/bar") is False + assert fs.isdvc("dir/repo/foo") is True with mock.patch.object( - fs, "_get_repo", side_effect=assert_fs_belongs_to_repo(subrepo2.dvc) + fs.fs, "_get_repo", side_effect=assert_fs_belongs_to_repo(subrepo2.dvc) ): - assert fs.exists(os.path.join("dir", "repo2", "lorem")) is True - assert fs.exists(os.path.join("dir", "repo2", "ipsum")) is False + assert fs.exists("dir/repo2/lorem") is True + assert fs.exists("dir/repo2/ipsum") is False - assert fs.isfile(os.path.join("dir", "repo2", "lorem")) is True - assert fs.isfile(os.path.join("dir", "repo2", "dir2", "ipsum")) is True - assert fs.isfile(os.path.join("dir", "repo2", "dir2")) is False + assert fs.isfile("dir/repo2/lorem") is True + assert fs.isfile("dir/repo2/dir2/ipsum") is True + assert fs.isfile("dir/repo2/dir2") is False - assert fs.isdir(os.path.join("dir", "repo2", "dir2")) is True - assert fs.isdir(os.path.join("dir", "repo2", "dir2", "ipsum")) is False - assert fs.isdvc(os.path.join("dir", "repo2", "lorem")) is True + assert fs.isdir("dir/repo2/dir2") is True + assert fs.isdir("dir/repo2/dir2/ipsum") is False + assert fs.isdvc("dir/repo2/lorem") is True @pytest.mark.parametrize( @@ -398,12 +396,12 @@ def f(*args, **kwargs): ( True, [ - os.path.join("dir", "repo", "foo.dvc"), - os.path.join("dir", "repo", ".dvcignore"), - os.path.join("dir", "repo", "dir1.dvc"), - os.path.join("dir", "repo2", ".dvcignore"), - os.path.join("dir", "repo2", "lorem.dvc"), - os.path.join("dir", "repo2", "dir2.dvc"), + "dir/repo/foo.dvc", + "dir/repo/.dvcignore", + "dir/repo/dir1.dvc", + "dir/repo2/.dvcignore", + "dir/repo2/lorem.dvc", + "dir/repo2/dir2.dvc", ], ), ], @@ -432,17 +430,17 @@ def test_subrepo_walk(tmp_dir, scm, dvc, dvcfiles, extra_expected): dvc._reset() fs = RepoFileSystem(repo=dvc) expected = [ - os.path.join("dir", "repo"), - os.path.join("dir", "repo.txt"), - os.path.join("dir", "repo2"), - os.path.join("dir", "repo", ".gitignore"), - os.path.join("dir", "repo", "foo"), - os.path.join("dir", "repo", "dir1"), - os.path.join("dir", "repo", "dir1", "bar"), - os.path.join("dir", "repo2", ".gitignore"), - os.path.join("dir", "repo2", "lorem"), - os.path.join("dir", "repo2", "dir2"), - os.path.join("dir", "repo2", "dir2", "ipsum"), + "dir/repo", + "dir/repo.txt", + "dir/repo2", + "dir/repo/.gitignore", + "dir/repo/foo", + "dir/repo/dir1", + "dir/repo/dir1/bar", + "dir/repo2/.gitignore", + "dir/repo2/lorem", + "dir/repo2/dir2", + "dir/repo2/dir2/ipsum", ] actual = [] @@ -452,7 +450,7 @@ def test_subrepo_walk(tmp_dir, scm, dvc, dvcfiles, extra_expected): ignore_subrepos=False, ): for entry in dirs + files: - actual.append(os.path.join(root, entry)) + actual.append(posixpath.join(root, entry)) expected += extra_expected assert set(actual) == set(expected) @@ -481,27 +479,27 @@ def test_repo_fs_no_subrepos(tmp_dir, dvc, scm): "lorem", "lorem.dvc", "dir", - os.path.join("dir", "repo.txt"), + "dir/repo.txt", ] actual = [] for root, dirs, files in fs.walk("", dvcfiles=True): for entry in dirs + files: - actual.append(os.path.join(root, entry)) + actual.append(posixpath.join(root, entry)) assert set(actual) == set(expected) assert len(actual) == len(expected) assert fs.isfile("lorem") is True - assert fs.isfile(os.path.join("dir", "repo", "foo")) is False - assert fs.isdir(os.path.join("dir", "repo")) is False + assert fs.isfile("dir/repo/foo") is False + assert fs.isdir("dir/repo") is False assert fs.isdir("dir") is True assert fs.isdvc("lorem") is True - assert fs.isdvc(os.path.join("dir", "repo", "dir1")) is False + assert fs.isdvc("dir/repo/dir1") is False - assert fs.exists(os.path.join("dir", "repo.txt")) is True - assert fs.exists(os.path.join("repo", "ipsum")) is False + assert fs.exists("dir/repo.txt") is True + assert fs.exists("repo/ipsum") is False def test_get_hash_cached_file(tmp_dir, dvc, mocker): @@ -540,18 +538,18 @@ def test_get_hash_cached_granular(tmp_dir, dvc, mocker): {"dir": {"foo": "foo", "bar": "bar", "subdir": {"data": "data"}}} ) fs = RepoFileSystem(repo=dvc) - subdir = os.path.join("dir", "subdir") + subdir = "dir/subdir" assert fs.info(subdir).get("md5") is None _, _, obj = stage(dvc.odb.local, subdir, fs, "md5") assert obj.hash_info == HashInfo( "md5", "af314506f1622d107e0ed3f14ec1a3b5.dir" ) - assert fs.info(os.path.join(subdir, "data")).get("md5") is None - _, _, obj = stage(dvc.odb.local, os.path.join(subdir, "data"), fs, "md5") + assert fs.info(posixpath.join(subdir, "data")).get("md5") is None + _, _, obj = stage(dvc.odb.local, posixpath.join(subdir, "data"), fs, "md5") assert obj.hash_info == HashInfo("md5", "8d777f385d3dfec8815d20f7496026dc") (tmp_dir / "dir" / "subdir" / "data").unlink() assert ( - fs.info(os.path.join(subdir, "data"))["md5"] + fs.info(posixpath.join(subdir, "data"))["md5"] == "8d777f385d3dfec8815d20f7496026dc" ) @@ -593,9 +591,7 @@ def test_get_hash_dirty_file(tmp_dir, dvc): # get_file_hash(file) should return workspace hash, not DVC cached hash fs = RepoFileSystem(repo=dvc) assert fs.info("file").get("md5") is None - staging, _, obj = stage( - dvc.odb.local, "file", fs, "md5" - ) + staging, _, obj = stage(dvc.odb.local, "file", fs, "md5") assert obj.hash_info == something_hash_info check(staging, obj) @@ -607,9 +603,7 @@ def test_get_hash_dirty_file(tmp_dir, dvc): # get_file_hash(file) should return DVC cached hash assert fs.info("file")["md5"] == file_hash_info.value - _, hash_info = get_file_hash( - "file", fs, "md5", state=dvc.state - ) + _, hash_info = get_file_hash("file", fs, "md5", state=dvc.state) assert hash_info == file_hash_info # tmp_dir/file can be staged even though it is missing in workspace since @@ -662,14 +656,22 @@ def dvc_structure(suffix): repo_dir.dvc_gen(dvc_files, commit=f"dvc add in {repo_dir}") if traverse_subrepos or repo_dir == tmp_dir: - repo_dir_path = repo_dir.relative_to(tmp_dir).fs_path if repo_dir != tmp_dir else "" + repo_dir_path = ( + repo_dir.relative_to(tmp_dir).as_posix() + if repo_dir != tmp_dir + else "" + ) expected[repo_dir_path] = set( scm_files.keys() | dvc_files.keys() | extras ) # files inside a dvc directory - expected[os.path.join(repo_dir_path, f"dvc-{base}")] = {f"ipsum-{base}"} + expected[posixpath.join(repo_dir_path, f"dvc-{base}")] = { + f"ipsum-{base}" + } # files inside a git directory - expected[os.path.join(repo_dir_path, f"dir-{base}")] = {f"bar-{base}"} + expected[posixpath.join(repo_dir_path, f"dir-{base}")] = { + f"bar-{base}" + } if traverse_subrepos: # update subrepos diff --git a/tests/unit/fs/test_repo_info.py b/tests/unit/fs/test_repo_info.py index 79af258bb8..f50589aac9 100644 --- a/tests/unit/fs/test_repo_info.py +++ b/tests/unit/fs/test_repo_info.py @@ -38,31 +38,29 @@ def repo_fs(tmp_dir, dvc, scm): tmp_dir.scm_gen(fs_structure, commit="repo init") tmp_dir.dvc_gen(dvc_structure, commit="use dvc") - yield RepoFileSystem(dvc, subrepos=True) + yield RepoFileSystem(repo=dvc, subrepos=True) def test_info_not_existing(repo_fs): - path = os.path.join("path", "that", "does", "not", "exist") - with pytest.raises(FileNotFoundError): - repo_fs.info(path) + repo_fs.info("path/that/does/not/exist") @pytest.mark.parametrize( "path", [ "README.md", - os.path.join("models", "train.py"), - os.path.join("models", "test.py"), - os.path.join("src", "utils", "__init__.py"), - os.path.join("src", "utils", "serve_model.py"), + "models/train.py", + "models/test.py", + "src/utils/__init__.py", + "src/utils/serve_model.py", ], ) def test_info_git_tracked_file(repo_fs, path): info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs._root_dir - assert not info["isdvc"] + assert info["repo"].root_dir == repo_fs.repo.root_dir + assert "dvc_info" not in info assert info["type"] == "file" assert not info["isexec"] @@ -70,28 +68,28 @@ def test_info_git_tracked_file(repo_fs, path): @pytest.mark.parametrize( "path", [ - os.path.join("data", "raw", "raw-1.csv"), - os.path.join("data", "raw", "raw-2.csv"), - os.path.join("data", "processed", "processed-1.csv"), - os.path.join("data", "processed", "processed-2.csv"), - os.path.join("models", "transform.pickle"), + "data/raw/raw-1.csv", + "data/raw/raw-2.csv", + "data/processed/processed-1.csv", + "data/processed/processed-2.csv", + "models/transform.pickle", ], ) def test_info_dvc_tracked_file(repo_fs, path): info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs._root_dir - assert info["isdvc"] + assert info["repo"].root_dir == repo_fs.repo.root_dir + assert info["dvc_info"]["isdvc"] assert info["type"] == "file" assert not info["isexec"] -@pytest.mark.parametrize("path", ["src", os.path.join("src", "utils")]) +@pytest.mark.parametrize("path", ["src", "src/utils"]) def test_info_git_only_dirs(repo_fs, path): info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs._root_dir - assert not info["isdvc"] + assert info["repo"].root_dir == repo_fs.repo.root_dir + assert "dvc_info" not in info assert info["type"] == "directory" assert not info["isexec"] @@ -100,8 +98,8 @@ def test_info_git_only_dirs(repo_fs, path): def test_info_git_dvc_mixed_dirs(repo_fs, path): info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs._root_dir - assert not info["isdvc"] + assert info["repo"].root_dir == repo_fs.repo.root_dir + assert not info["dvc_info"]["isdvc"] assert info["type"] == "directory" assert not info["isexec"] @@ -110,15 +108,15 @@ def test_info_git_dvc_mixed_dirs(repo_fs, path): "path", [ "data", - os.path.join("data", "raw"), - os.path.join("data", "processed"), + "data/raw", + "data/processed", ], ) def test_info_dvc_only_dirs(repo_fs, path): info = repo_fs.info(path) - assert info["repo"].root_dir == repo_fs._root_dir - assert info["isdvc"] + assert info["repo"].root_dir == repo_fs.repo.root_dir + assert info["dvc_info"]["isdvc"] assert info["type"] == "directory" assert not info["isexec"] @@ -132,8 +130,8 @@ def test_info_on_subrepos(make_tmp_dir, tmp_dir, dvc, scm, repo_fs): for path in [ "subrepo", - os.path.join("subrepo", "foo"), - os.path.join("subrepo", "foobar"), + "subrepo/foo", + "subrepo/foobar", ]: info = repo_fs.info(path) assert info["repo"].root_dir == str( diff --git a/tests/unit/test_external_repo.py b/tests/unit/test_external_repo.py index c4ff4d4646..4d83495d5b 100644 --- a/tests/unit/test_external_repo.py +++ b/tests/unit/test_external_repo.py @@ -26,7 +26,7 @@ def test_hook_is_called(tmp_dir, erepo_dir, mocker): repo.dvc_gen("bar", "bar", commit=f"dvc add {repo}/bar") with external_repo(str(erepo_dir)) as repo: - spy = mocker.spy(repo.repo_fs, "repo_factory") + spy = mocker.spy(repo.repo_fs.fs, "repo_factory") list(repo.repo_fs.walk("", ignore_subrepos=False)) # drain assert spy.call_count == len(subrepos) @@ -37,7 +37,7 @@ def test_hook_is_called(tmp_dir, erepo_dir, mocker): call( path, fs=repo.fs, - repo_factory=repo.repo_fs.repo_factory, + repo_factory=repo.repo_fs.fs.repo_factory, ) for path in paths ], @@ -63,7 +63,7 @@ def test_subrepo_is_constructed_properly( with external_repo( str(tmp_dir), cache_dir=str(cache_dir), cache_types=["symlink"] ) as repo: - spy = mocker.spy(repo.repo_fs, "repo_factory") + spy = mocker.spy(repo.repo_fs.fs, "repo_factory") list(repo.repo_fs.walk("", ignore_subrepos=False)) # drain assert spy.call_count == 1