diff --git a/dvc/dependency/repo.py b/dvc/dependency/repo.py index c53d0d55ba..f76344f119 100644 --- a/dvc/dependency/repo.py +++ b/dvc/dependency/repo.py @@ -50,13 +50,24 @@ def _make_repo(self, *, locked=True): return external_repo(d["url"], rev=rev) def _get_checksum(self, locked=True): + from dvc.repo.tree import RepoTree + with self._make_repo(locked=locked) as repo: try: return repo.find_out_by_relpath(self.def_path).info["md5"] except OutputNotFoundError: path = PathInfo(os.path.join(repo.root_dir, self.def_path)) + + # we want stream but not fetch, so DVC out directories are + # walked, but dir contents is not fetched + tree = RepoTree(repo, stream=True) + # We are polluting our repo cache with some dir listing here - return self.repo.cache.local.get_checksum(path) + if tree.isdir(path): + return self.repo.cache.local.get_dir_checksum( + path, tree=tree + ) + return tree.get_file_checksum(path) def status(self): current_checksum = self._get_checksum(locked=True) @@ -76,16 +87,16 @@ def dumpd(self): def download(self, to): with self._make_repo() as repo: if self.def_repo.get(self.PARAM_REV_LOCK) is None: - self.def_repo[self.PARAM_REV_LOCK] = repo.scm.get_rev() - - if hasattr(repo, "cache"): - repo.cache.local.cache_dir = self.repo.cache.local.cache_dir + self.def_repo[self.PARAM_REV_LOCK] = repo.get_rev() - repo.pull_to(self.def_path, to.path_info) + cache = self.repo.cache.local + with repo.use_cache(cache): + _, _, cache_infos = repo.fetch_external([self.def_path]) + cache.checkout(to.path_info, cache_infos[0]) def update(self, rev=None): if rev: self.def_repo[self.PARAM_REV] = rev with self._make_repo(locked=False) as repo: - self.def_repo[self.PARAM_REV_LOCK] = repo.scm.get_rev() + self.def_repo[self.PARAM_REV_LOCK] = repo.get_rev() diff --git a/dvc/external_repo.py b/dvc/external_repo.py index 8f21fa0f0d..2c66be806c 100644 --- a/dvc/external_repo.py +++ b/dvc/external_repo.py @@ -4,12 +4,12 @@ import threading from contextlib import contextmanager from distutils.dir_util import copy_tree +from typing import Iterable -from funcy import cached_property, retry, suppress, wrap_with +from funcy import cached_property, retry, wrap_with from dvc.config import NoRemoteError, NotDvcRepoError from dvc.exceptions import ( - CheckoutError, FileMissingError, NoOutputInExternalRepoError, NoRemoteInExternalRepoError, @@ -18,9 +18,10 @@ ) from dvc.path_info import PathInfo from dvc.repo import Repo +from dvc.repo.tree import RepoTree from dvc.scm.git import Git -from dvc.utils import tmp_fname -from dvc.utils.fs import fs_copy, move, remove +from dvc.scm.tree import is_working_tree +from dvc.utils.fs import remove logger = logging.getLogger(__name__) @@ -29,10 +30,12 @@ def external_repo(url, rev=None, for_write=False): logger.debug("Creating external repo %s@%s", url, rev) path = _cached_clone(url, rev, for_write=for_write) + if not rev: + rev = "HEAD" try: - repo = ExternalRepo(path, url) + repo = ExternalRepo(path, url, rev, for_write=for_write) except NotDvcRepoError: - repo = ExternalGitRepo(path, url) + repo = ExternalGitRepo(path, url, rev) try: yield repo @@ -64,58 +67,98 @@ def clean_repos(): _remove(path) -class ExternalRepo(Repo): - def __init__(self, root_dir, url): - super().__init__(root_dir) - self.url = url - self._set_cache_dir() - self._fix_upstream() - - def pull_to(self, path, to_info): - """ - Pull the corresponding file or directory specified by `path` and - checkout it into `to_info`. - - It works with files tracked by Git and DVC, and also local files - outside the repository. - """ - out = None - path_info = PathInfo(self.root_dir) / path +class BaseExternalRepo: - with suppress(OutputNotFoundError): - (out,) = self.find_outs_by_path(path_info, strict=False) + _local_cache = None - try: - if out and out.use_cache: - self._pull_cached(out, path_info, to_info) - return + @property + def local_cache(self): + if hasattr(self, "cache"): + return self.cache.local + return self._local_cache - # Check if it is handled by Git (it can't have an absolute path) - if os.path.isabs(path): - raise FileNotFoundError - - fs_copy(path_info, to_info) - except FileNotFoundError: - raise PathMissingError(path, self.url) + @contextmanager + def use_cache(self, cache): + """Use the specified cache in place of default tmpdir cache for + download operations. + """ + if hasattr(self, "cache"): + save_cache = self.cache.local + self.cache.local = cache + self._local_cache = cache - def _pull_cached(self, out, path_info, dest): - with self.state: - tmp = PathInfo(tmp_fname(dest)) - src = tmp / path_info.relative_to(out.path_info) + yield - out.path_info = tmp + if hasattr(self, "cache"): + self.cache.local = save_cache + self._local_cache = None - # Only pull unless all needed cache is present - if out.changed_cache(filter_info=src): - self.cloud.pull(out.get_used_cache(filter_info=src)) + @cached_property + def repo_tree(self): + return RepoTree(self, fetch=True) + + def get_rev(self): + if is_working_tree(self.tree): + return self.scm.get_rev() + if hasattr(self.tree, "tree"): + return self.tree.tree.rev + return self.tree.rev + + def fetch_external(self, paths: Iterable, **kwargs): + """Fetch specified external repo paths into cache. + + Returns 3-tuple in the form + (downloaded, failed, list(cache_infos)) + where cache_infos can be used as checkout targets for the + fetched paths. + """ + download_results = [] + failed = 0 + + paths = [PathInfo(self.root_dir) / path for path in paths] + + def download_update(result): + download_results.append(result) + + save_infos = [] + for path in paths: + if not self.repo_tree.exists(path): + raise PathMissingError(path, self.url) + save_info = self.local_cache.save( + path, + None, + tree=self.repo_tree, + download_callback=download_update, + ) + save_infos.append(save_info) + + return sum(download_results), failed, save_infos + + def get_external(self, path, dest): + """Convenience wrapper for fetch_external and checkout.""" + if self.local_cache: + # fetch DVC and git files to tmpdir cache, then checkout + _, _, save_infos = self.fetch_external([path]) + self.local_cache.checkout(PathInfo(dest), save_infos[0]) + else: + # git-only erepo with no cache, just copy files directly + # to dest + path = PathInfo(self.root_dir) / path + if not self.repo_tree.exists(path): + raise PathMissingError(path, self.url) + self.repo_tree.copytree(path, dest) - try: - out.checkout(filter_info=src) - except CheckoutError: - raise FileNotFoundError - move(src, dest) - remove(tmp) +class ExternalRepo(Repo, BaseExternalRepo): + def __init__(self, root_dir, url, rev, for_write=False): + if for_write: + super().__init__(root_dir) + else: + root_dir = os.path.realpath(root_dir) + super().__init__(root_dir, scm=Git(root_dir), rev=rev) + self.url = url + self._set_cache_dir() + self._fix_upstream() @wrap_with(threading.Lock()) def _set_cache_dir(self): @@ -125,6 +168,7 @@ def _set_cache_dir(self): cache_dir = CACHE_DIRS[self.url] = tempfile.mkdtemp("dvc-cache") self.cache.local.cache_dir = cache_dir + self._local_cache = self.cache.local def _fix_upstream(self): if not os.path.isdir(self.url): @@ -165,10 +209,11 @@ def _add_upstream(self, src_repo): self.config["core"]["remote"] = "auto-generated-upstream" -class ExternalGitRepo: - def __init__(self, root_dir, url): - self.root_dir = root_dir +class ExternalGitRepo(BaseExternalRepo): + def __init__(self, root_dir, url, rev): + self.root_dir = os.path.realpath(root_dir) self.url = url + self.tree = self.scm.get_tree(rev) @cached_property def scm(self): @@ -181,23 +226,15 @@ def close(self): def find_out_by_relpath(self, path): raise OutputNotFoundError(path, self) - def pull_to(self, path, to_info): - try: - # Git handled files can't have absolute path - if os.path.isabs(path): - raise FileNotFoundError - - fs_copy(os.path.join(self.root_dir, path), to_info) - except FileNotFoundError: - raise PathMissingError(path, self.url) - @contextmanager def open_by_relpath(self, path, mode="r", encoding=None, **kwargs): """Opens a specified resource as a file object.""" + tree = RepoTree(self) try: - abs_path = os.path.join(self.root_dir, path) - with open(abs_path, mode, encoding=encoding) as fd: - yield fd + with tree.open( + path, mode=mode, encoding=encoding, **kwargs + ) as fobj: + yield fobj except FileNotFoundError: raise PathMissingError(path, self.url) @@ -209,14 +246,12 @@ def _cached_clone(url, rev, for_write=False): revision checked out. If for_write is set prevents reusing this dir via cache. """ - if not for_write and Git.is_sha(rev) and (url, rev) in CLONES: - return CLONES[url, rev] - + # even if we have already cloned this repo, we may need to + # fetch/fast-forward to get specified rev clone_path = _clone_default_branch(url, rev) - rev_sha = Git(clone_path).resolve_rev(rev or "HEAD") - if not for_write and (url, rev_sha) in CLONES: - return CLONES[url, rev_sha] + if not for_write and (url) in CLONES: + return CLONES[url] # Copy to a new dir to keep the clone clean repo_path = tempfile.mkdtemp("dvc-erepo") @@ -224,11 +259,10 @@ def _cached_clone(url, rev, for_write=False): copy_tree(clone_path, repo_path) # Check out the specified revision - if rev is not None: + if for_write: _git_checkout(repo_path, rev) - - if not for_write: - CLONES[url, rev_sha] = repo_path + else: + CLONES[url] = repo_path return repo_path diff --git a/dvc/istextfile.py b/dvc/istextfile.py index 2e10592d9c..e4c29fdcf1 100644 --- a/dvc/istextfile.py +++ b/dvc/istextfile.py @@ -7,13 +7,17 @@ TEXT_CHARS = bytes(range(32, 127)) + b"\n\r\t\f\b" -def istextfile(fname, blocksize=512): +def istextfile(fname, blocksize=512, tree=None): """ Uses heuristics to guess whether the given file is text or binary, by reading a single block of bytes from the file. If more than 30% of the chars in the block are non-text, or there are NUL ('\x00') bytes in the block, assume this is a binary file. """ - with open(fname, "rb") as fobj: + if tree: + open_func = tree.open + else: + open_func = open + with open_func(fname, "rb") as fobj: block = fobj.read(blocksize) if not block: diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 843563b3c0..8b338fd07b 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -212,23 +212,33 @@ def _calculate_checksums(self, file_infos): checksums = dict(zip(file_infos, tasks)) return checksums - def _collect_dir(self, path_info): + def _collect_dir(self, path_info, tree=None, save_tree=False, **kwargs): file_infos = set() - for fname in self.walk_files(path_info): + if tree: + walk_files = tree.walk_files + else: + walk_files = self.walk_files + + for fname in walk_files(path_info, **kwargs): if DvcIgnore.DVCIGNORE_FILE == fname.name: raise DvcIgnoreInCollectedDirError(fname.parent) file_infos.add(fname) - checksums = {fi: self.state.get(fi) for fi in file_infos} - not_in_state = { - fi for fi, checksum in checksums.items() if checksum is None - } - - new_checksums = self._calculate_checksums(not_in_state) + if tree: + checksums = {fi: tree.get_file_checksum(fi) for fi in file_infos} + if save_tree: + for fi, checksum in checksums.items(): + self._save_file(fi, checksum, tree=tree, **kwargs) + else: + checksums = {fi: self.state.get(fi) for fi in file_infos} + not_in_state = { + fi for fi, checksum in checksums.items() if checksum is None + } - checksums.update(new_checksums) + new_checksums = self._calculate_checksums(not_in_state) + checksums.update(new_checksums) result = [ { @@ -249,11 +259,14 @@ def _collect_dir(self, path_info): # Sorting the list by path to ensure reproducibility return sorted(result, key=itemgetter(self.PARAM_RELPATH)) - def get_dir_checksum(self, path_info): + def get_dir_checksum(self, path_info, tree=None): if not self.cache: raise RemoteCacheRequiredError(path_info) - dir_info = self._collect_dir(path_info) + dir_info = self._collect_dir(path_info, tree=None) + if tree: + # don't save state entry for path_info if it is a tree path + path_info = None return self._save_dir_info(dir_info, path_info) def _save_dir_info(self, dir_info, path_info=None): @@ -459,14 +472,24 @@ def _do_link(self, from_info, to_info, link_method): "Created '%s': %s -> %s", self.cache_types[0], from_info, to_info, ) - def _save_file(self, path_info, checksum, save_link=True, tree=None): + def _save_file( + self, path_info, checksum, save_link=True, tree=None, **kwargs + ): assert checksum cache_info = self.checksum_to_path_info(checksum) if tree: if self.changed_cache(checksum): with tree.open(path_info, mode="rb") as fobj: - self.copy_fobj(fobj, cache_info) + # if tree has fetch enabled, DVC out will be fetched on + # open and we do not need to read/copy any data + if not ( + tree.isdvc(path_info, strict=False) and tree.fetch + ): + self.copy_fobj(fobj, cache_info) + callback = kwargs.get("download_callback") + if callback: + callback(1) else: if self.changed_cache(checksum): self.move(path_info, cache_info, mode=self.CACHE_MODE) @@ -487,6 +510,7 @@ def _save_file(self, path_info, checksum, save_link=True, tree=None): if not tree or is_working_tree(tree): self.state.save(path_info, checksum) self.state.save(cache_info, checksum) + return {self.PARAM_CHECKSUM: checksum} def _cache_is_copy(self, path_info): """Checks whether cache uses copies.""" @@ -510,9 +534,14 @@ def _cache_is_copy(self, path_info): self.cache_type_confirmed = True return self.cache_types[0] == "copy" - def _save_dir(self, path_info, checksum, save_link=True, tree=None): + def _save_dir( + self, path_info, checksum, save_link=True, tree=None, **kwargs + ): if tree: - checksum = self._save_tree(path_info, tree) + dir_info = self._collect_dir( + path_info, tree=tree, save_tree=True, **kwargs + ) + checksum = self._save_dir_info(dir_info) else: dir_info = self.get_dir_cache(checksum) @@ -530,23 +559,7 @@ def _save_dir(self, path_info, checksum, save_link=True, tree=None): self.state.save(cache_info, checksum) if not tree or is_working_tree(tree): self.state.save(path_info, checksum) - - def _save_tree(self, path_info, tree): - # save tree directory to cache, collect dir cache during walk and - # return the resulting dir checksum - dir_info = [] - for fname in tree.walk_files(path_info): - checksum = tree.get_file_checksum(fname) - file_info = { - self.PARAM_CHECKSUM: checksum, - self.PARAM_RELPATH: fname.relative_to(path_info).as_posix(), - } - self._save_file(fname, checksum, tree=tree) - dir_info.append(file_info) - - return self._save_dir_info( - sorted(dir_info, key=itemgetter(self.PARAM_RELPATH)) - ) + return {self.PARAM_CHECKSUM: checksum} def is_empty(self, path_info): return False @@ -575,20 +588,25 @@ def walk_files(self, path_info): def protect(path_info): pass - def save(self, path_info, checksum_info, save_link=True, tree=None): + def save( + self, path_info, checksum_info, save_link=True, tree=None, **kwargs + ): if path_info.scheme != self.scheme: raise RemoteActionNotImplemented( f"save {path_info.scheme} -> {self.scheme}", self.scheme, ) if tree: - # save checksum will be computed during tree walk - checksum = None + if tree.isdir(path_info): + # save checksum will be computed during tree walk + checksum = None + else: + checksum = tree.get_file_checksum(path_info) else: checksum = checksum_info[self.PARAM_CHECKSUM] - self._save(path_info, checksum, save_link, tree) + return self._save(path_info, checksum, save_link, tree, **kwargs) - def _save(self, path_info, checksum, save_link=True, tree=None): + def _save(self, path_info, checksum, save_link=True, tree=None, **kwargs): if tree: logger.debug("Saving tree path '%s' to cache.", path_info) else: @@ -602,9 +620,10 @@ def _save(self, path_info, checksum, save_link=True, tree=None): isdir = self.isdir if isdir(path_info): - self._save_dir(path_info, checksum, save_link, tree) - return - self._save_file(path_info, checksum, save_link, tree) + return self._save_dir( + path_info, checksum, save_link, tree, **kwargs + ) + return self._save_file(path_info, checksum, save_link, tree, **kwargs) def _handle_transfer_exception( self, from_info, to_info, exception, operation diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 5ae66158fd..43c8df47c9 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -175,9 +175,11 @@ def getsize(path_info): return os.path.getsize(path_info) def walk_files(self, path_info): - assert is_working_tree(self.repo.tree) - - for fname in self.repo.tree.walk_files(path_info): + if self.work_tree: + tree = self.work_tree + else: + tree = self.repo.tree + for fname in tree.walk_files(path_info): yield PathInfo(fname) def get_file_checksum(self, path_info): diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 8c863942f7..145144189c 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -57,8 +57,8 @@ class Repo: from dvc.repo.get_url import get_url from dvc.repo.update import update - def __init__(self, root_dir=None): - from dvc.state import State + def __init__(self, root_dir=None, scm=None, rev=None): + from dvc.state import State, StateNoop from dvc.lock import make_lock from dvc.scm import SCM from dvc.cache import Cache @@ -70,17 +70,24 @@ def __init__(self, root_dir=None): from dvc.utils.fs import makedirs from dvc.stage.cache import StageCache - root_dir = self.find_root(root_dir) + if scm: + # use GitTree instead of WorkingTree as default repo tree instance + tree = scm.get_tree(rev) + self.root_dir = self.find_root(root_dir, tree) + self.scm = scm + self.tree = tree + self.state = StateNoop() + else: + root_dir = self.find_root(root_dir) + self.root_dir = os.path.abspath(os.path.realpath(root_dir)) - self.root_dir = os.path.abspath(os.path.realpath(root_dir)) self.dvc_dir = os.path.join(self.root_dir, self.DVC_DIR) - self.config = Config(self.dvc_dir) - no_scm = self.config["core"].get("no_scm", False) - self.scm = SCM(self.root_dir, no_scm=no_scm) - - self.tree = WorkingTree(self.root_dir) + if not scm: + no_scm = self.config["core"].get("no_scm", False) + self.scm = SCM(self.root_dir, no_scm=no_scm) + self.tree = WorkingTree(self.root_dir) self.tmp_dir = os.path.join(self.dvc_dir, "tmp") self.index_dir = os.path.join(self.tmp_dir, "index") @@ -94,9 +101,11 @@ def __init__(self, root_dir=None): friendly=True, ) - # NOTE: storing state and link_state in the repository itself to avoid - # any possible state corruption in 'shared cache dir' scenario. - self.state = State(self) + if not scm: + # NOTE: storing state and link_state in the repository itself to + # avoid any possible state corruption in 'shared cache dir' + # scenario. + self.state = State(self) self.cache = Cache(self) self.cloud = DataCloud(self) @@ -124,9 +133,14 @@ def __repr__(self): return f"{self.__class__.__name__}: '{self.root_dir}'" @classmethod - def find_root(cls, root=None): + def find_root(cls, root=None, tree=None): root_dir = os.path.realpath(root or os.curdir) + if tree: + if tree.isdir(os.path.join(root_dir, cls.DVC_DIR)): + return root_dir + raise NotDvcRepoError(f"'{root}' does not contain DVC directory") + if not os.path.isdir(root_dir): raise NotDvcRepoError(f"directory '{root}' does not exist") diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index a9534d2a42..dcab08ec47 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -1,9 +1,7 @@ import logging -from dvc.cache import NamedCache from dvc.config import NoRemoteError -from dvc.exceptions import DownloadError, OutputNotFoundError -from dvc.path_info import PathInfo +from dvc.exceptions import DownloadError from dvc.scm.base import CloneError logger = logging.getLogger(__name__) @@ -78,35 +76,15 @@ def _fetch( def _fetch_external(self, repo_url, repo_rev, files, jobs): - from dvc.external_repo import external_repo, ExternalRepo + from dvc.external_repo import external_repo failed, downloaded = 0, 0 try: with external_repo(repo_url, repo_rev) as repo: - is_dvc_repo = isinstance(repo, ExternalRepo) - # gather git-only tracked files if dvc repo - git_files = [] if is_dvc_repo else files - if is_dvc_repo: - repo.cache.local.cache_dir = self.cache.local.cache_dir - with repo.state: - cache = NamedCache() - for name in files: - try: - out = repo.find_out_by_relpath(name) - except OutputNotFoundError: - # try to add to cache if they are git-tracked files - git_files.append(name) - else: - cache.update(out.get_used_cache()) - - try: - downloaded += repo.cloud.pull(cache, jobs=jobs) - except DownloadError as exc: - failed += exc.amount - - d, f = _git_to_cache(self.cache.local, repo.root_dir, git_files) - downloaded += d - failed += f + with repo.use_cache(self.cache.local): + d, f, _ = repo.fetch_external(files, jobs=jobs) + downloaded += d + failed += f except CloneError: failed += 1 logger.exception( @@ -114,27 +92,3 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): ) return downloaded, failed - - -def _git_to_cache(cache, repo_root, files): - """Save files from a git repo directly to the cache.""" - failed = set() - num_downloads = 0 - repo_root = PathInfo(repo_root) - for file in files: - info = cache.save_info(repo_root / file) - if info.get(cache.PARAM_CHECKSUM) is None: - failed.add(file) - continue - - if cache.changed_cache(info[cache.PARAM_CHECKSUM]): - logger.debug("fetched '%s' from '%s' repo", file, repo_root) - num_downloads += 1 - cache.save(repo_root / file, info, save_link=False) - - if failed: - logger.exception( - "failed to fetch data for {}".format(", ".join(failed)) - ) - - return num_downloads, len(failed) diff --git a/dvc/repo/get.py b/dvc/repo/get.py index 973c5c3371..1141b00772 100644 --- a/dvc/repo/get.py +++ b/dvc/repo/get.py @@ -4,7 +4,6 @@ import shortuuid from dvc.exceptions import DvcException -from dvc.path_info import PathInfo from dvc.utils import resolve_output from dvc.utils.fs import remove @@ -51,7 +50,6 @@ def get(url, path, out=None, rev=None): # Also, we can't use theoretical "move" link type here, because # the same cache file might be used a few times in a directory. repo.cache.local.cache_types = ["reflink", "hardlink", "copy"] - - repo.pull_to(path, PathInfo(out)) + repo.get_external(path, out) finally: remove(tmp_dir) diff --git a/dvc/repo/ls.py b/dvc/repo/ls.py index 3fc0abce3b..4564858f7b 100644 --- a/dvc/repo/ls.py +++ b/dvc/repo/ls.py @@ -28,22 +28,13 @@ def ls( } """ from dvc.external_repo import external_repo - from dvc.repo import Repo with external_repo(url, rev) as repo: path_info = PathInfo(repo.root_dir) if path: path_info /= path - ret = {} - if isinstance(repo, Repo): - ret = _ls(repo, path_info, recursive, True) - - nondvc = {} - if not dvc_only: - nondvc = _ls(repo, path_info, recursive, False) - - ret.update(nondvc) + ret = _ls(repo, path_info, recursive, dvc_only) if path and not ret: raise PathMissingError(path, repo, dvc_only=dvc_only) @@ -56,46 +47,52 @@ def ls( return ret_list -def _ls(repo, path_info, recursive=None, dvc=False): - from dvc.ignore import CleanTree - from dvc.repo.tree import DvcTree - from dvc.scm.tree import WorkingTree +def _ls(repo, path_info, recursive=None, dvc_only=False): + from dvc.repo.tree import RepoTree - if dvc: - tree = DvcTree(repo) - else: - tree = CleanTree(WorkingTree(repo.root_dir)) + # use our own RepoTree instance instead of repo.repo_tree since we do not + # want fetch/stream enabled for ls + tree = RepoTree(repo) ret = {} try: - for root, dirs, files in tree.walk(path_info.fspath): + for root, dirs, files in tree.walk(path_info.fspath, dvcfiles=True): for fname in files: info = PathInfo(root) / fname - path = str(info.relative_to(path_info)) - ret[path] = { - "isout": dvc, - "isdir": False, - "isexec": False if dvc else tree.isexec(info.fspath), - } + dvc = tree.isdvc(info) + if dvc or not dvc_only: + path = str(info.relative_to(path_info)) + ret[path] = { + "isout": dvc, + "isdir": False, + "isexec": False if dvc else tree.isexec(info), + } if not recursive: for dname in dirs: info = PathInfo(root) / dname - path = str(info.relative_to(path_info)) - ret[path] = { - "isout": tree.isdvc(info.fspath) if dvc else False, - "isdir": True, - "isexec": False if dvc else tree.isexec(info.fspath), - } + if not dvc_only or ( + tree.dvctree and tree.dvctree.exists(info) + ): + dvc = tree.isdvc(info) + path = str(info.relative_to(path_info)) + ret[path] = { + "isout": dvc, + "isdir": True, + "isexec": False if dvc else tree.isexec(info), + } break except NotADirectoryError: - return { - path_info.name: { - "isout": dvc, - "isdir": False, - "isexec": False if dvc else tree.isexec(path_info.fspath), + dvc = tree.isdvc(path_info) + if dvc or not dvc_only: + return { + path_info.name: { + "isout": dvc, + "isdir": False, + "isexec": False if dvc else tree.isexec(path_info), + } } - } + return {} except FileNotFoundError: return {} diff --git a/dvc/repo/tree.py b/dvc/repo/tree.py index 4e62adf198..361e702ac4 100644 --- a/dvc/repo/tree.py +++ b/dvc/repo/tree.py @@ -7,6 +7,7 @@ from dvc.remote.base import RemoteActionNotImplemented from dvc.scm.tree import BaseTree from dvc.utils import file_md5 +from dvc.utils.fs import copy_fobj_to_file, makedirs logger = logging.getLogger(__name__) @@ -47,7 +48,10 @@ def _get_granular_checksum(self, path, out, remote=None): raise FileNotFoundError dir_cache = out.get_dir_cache(remote=remote) for entry in dir_cache: - if path == out.path_info / entry[out.remote.PARAM_RELPATH]: + entry_relpath = entry[out.remote.PARAM_RELPATH] + if os.name == "nt": + entry_relpath = entry_relpath.replace("/", os.sep) + if path == out.path_info / entry_relpath: return entry[out.remote.PARAM_CHECKSUM] raise FileNotFoundError @@ -80,10 +84,8 @@ def open(self, path, mode="r", encoding="utf-8", remote=None): ) except RemoteActionNotImplemented: pass - cache_info = out.get_used_cache( - filter_info=path, remote=remote - ) - self.repo.cloud.pull(cache_info, remote=remote) + cache_info = out.get_used_cache(filter_info=path, remote=remote) + self.repo.cloud.pull(cache_info, remote=remote) if out.is_dir_checksum: checksum = self._get_granular_checksum(path, out) @@ -105,10 +107,25 @@ def isdir(self, path): path_info = PathInfo(os.path.abspath(path)) outs = self._find_outs(path, strict=False, recursive=True) - if len(outs) != 1 or outs[0].path_info != path_info: + if len(outs) != 1: return True - return outs[0].is_dir_checksum + out = outs[0] + if not out.is_dir_checksum: + if out.path_info != path_info: + return True + return False + + if out.path_info == path_info: + return True + + # for dir checksum, we need to check if this is a file inside the + # directory + try: + self._get_granular_checksum(path, out) + return False + except FileNotFoundError: + return True def isfile(self, path): if not self.exists(path): @@ -141,7 +158,7 @@ def _walk(self, root, trie, topdown=True): else: assert False - def walk(self, top, topdown=True, **kwargs): + def walk(self, top, topdown=True, download_callback=None, **kwargs): from pygtrie import Trie assert topdown @@ -168,10 +185,14 @@ def walk(self, top, topdown=True, **kwargs): if self.fetch: if out.changed_cache(filter_info=top): used_cache = out.get_used_cache(filter_info=top) - self.repo.cloud.pull(used_cache, **kwargs) + downloaded = self.repo.cloud.pull(used_cache, **kwargs) + if download_callback: + download_callback(downloaded) for entry in dir_cache: entry_relpath = entry[out.remote.PARAM_RELPATH] + if os.name == "nt": + entry_relpath = entry_relpath.replace("/", os.sep) path_info = out.path_info / entry_relpath trie[path_info.parts] = None @@ -214,6 +235,18 @@ def __init__(self, repo, **kwargs): # git-only erepo's do not need dvctree self.dvctree = None + @property + def fetch(self): + if self.dvctree: + return self.dvctree.fetch + return False + + @property + def stream(self): + if self.dvctree: + return self.dvctree.stream + return False + def open(self, path, mode="r", encoding="utf-8", **kwargs): if "b" in mode: encoding = None @@ -355,3 +388,26 @@ def get_file_checksum(self, path_info): except OutputNotFoundError: pass return file_md5(path_info, self)[0] + + def copytree(self, top, dest): + top = PathInfo(top) + dest = PathInfo(dest) + + if not self.exists(top): + raise FileNotFoundError + + if self.isfile(top): + makedirs(dest.parent, exist_ok=True) + with self.open(top, mode="rb") as fobj: + copy_fobj_to_file(fobj, dest) + return + + for root, _, files in self.walk(top): + root = PathInfo(root) + dest_dir = root.relative_to(top) + makedirs(dest_dir, exist_ok=True) + for fname in files: + src = root / fname + dest = dest_dir / fname + with self.open(src, mode="rb") as fobj: + copy_fobj_to_file(fobj, dest) diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index 3055371682..b614d6cbe6 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -52,18 +52,14 @@ def file_md5(fname, tree=None): exists_func = tree.exists stat_func = tree.stat open_func = tree.open - # assume we don't need to run dos2unix when comparing git blobs - binary = True else: exists_func = os.path.exists stat_func = os.stat open_func = open - binary = False if exists_func(fname): hash_md5 = hashlib.md5() - if not binary: - binary = not istextfile(fname) + binary = not istextfile(fname, tree=tree) size = stat_func(fname).st_size no_progress_bar = True if size >= LARGE_FILE_SIZE: diff --git a/tests/func/test_external_repo.py b/tests/func/test_external_repo.py index 3adefbb308..f42a191c08 100644 --- a/tests/func/test_external_repo.py +++ b/tests/func/test_external_repo.py @@ -3,6 +3,7 @@ from mock import patch from dvc.external_repo import external_repo +from dvc.path_info import PathInfo from dvc.remote import LocalRemote from dvc.scm.git import Git from dvc.utils import relpath @@ -89,7 +90,10 @@ def test_pull_subdir_file(tmp_dir, erepo_dir): dest = tmp_dir / "file" with external_repo(os.fspath(erepo_dir)) as repo: - repo.pull_to(os.path.join("subdir", "file"), dest) + _, _, save_infos = repo.fetch_external( + [os.path.join("subdir", "file")] + ) + repo.cache.local.checkout(PathInfo(dest), save_infos[0]) assert dest.is_file() assert dest.read_text() == "contents" diff --git a/tests/func/test_get.py b/tests/func/test_get.py index 0e1ae42269..515dba1366 100644 --- a/tests/func/test_get.py +++ b/tests/func/test_get.py @@ -157,17 +157,13 @@ def test_get_to_dir(tmp_dir, erepo_dir, dname): assert (tmp_dir / dname / "file").read_text() == "contents" -def test_get_from_non_dvc_master(tmp_dir, git_dir, caplog): +def test_get_from_non_dvc_master(tmp_dir, git_dir): with git_dir.chdir(), git_dir.branch("branch", new=True): git_dir.init(dvc=True) git_dir.dvc_gen("some_file", "some text", commit="create some file") - caplog.clear() - - with caplog.at_level(logging.INFO, logger="dvc"): - Repo.get(os.fspath(git_dir), "some_file", out="some_dst", rev="branch") + Repo.get(os.fspath(git_dir), "some_file", out="some_dst", rev="branch") - assert caplog.text == "" assert (tmp_dir / "some_dst").read_text() == "some text"