Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e201108
erepo: load external repos directly from git tree
pmrowla May 21, 2020
4e9cd92
erepo: add fetch_external
pmrowla May 21, 2020
dde6c61
remote/RepoTree: add callback for keeping track of download counts when
pmrowla May 21, 2020
fad9794
repo: use erepo.fetch_external() in repo.fetch()
pmrowla May 21, 2020
d55c06f
erepo: return cache save_infos in fetch_external
pmrowla May 21, 2020
d265640
DependencyRepo: use erepo.fetch_external and cache.checkout instead of
pmrowla May 21, 2020
a12cbc7
erepo: remove pull_to
pmrowla May 21, 2020
ac7a5fe
DvcTree: support granularity for isdir()
pmrowla May 21, 2020
9d5d754
DvcTree: fix indentation bug
pmrowla May 21, 2020
419f286
RepoTree: add copytree() method
pmrowla May 21, 2020
10aa978
LocalRemote: use working tree for local cache walk_files
pmrowla May 21, 2020
c5f8938
repo: state should be noop for GitTree based erepos
pmrowla May 21, 2020
415aef8
erepo: add get_external()
pmrowla May 21, 2020
56d5c11
test_get: remove unneeded caplog check
pmrowla May 21, 2020
0359d02
erepo: add get_rev()
pmrowla May 21, 2020
092e423
tests: fix erepo test bug
pmrowla May 21, 2020
6a141e2
ls: use RepoTree
pmrowla May 22, 2020
0c951a8
remote: support trees in _collect_dir
pmrowla May 22, 2020
95ea7e2
remote: support trees in get_dir_checksum
pmrowla May 22, 2020
3ed4250
RepoTree: fix dir_cache relpaths in windows
pmrowla May 22, 2020
d6d031c
remote: fix duplicated fetch bug
pmrowla May 22, 2020
f6bf5bf
utils: don't skip md5 dos2unix check for git blobs
pmrowla May 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions dvc/dependency/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,24 @@ def _make_repo(self, *, locked=True):
return external_repo(d["url"], rev=rev)

def _get_checksum(self, locked=True):
from dvc.repo.tree import RepoTree

with self._make_repo(locked=locked) as repo:
try:
return repo.find_out_by_relpath(self.def_path).info["md5"]
except OutputNotFoundError:
path = PathInfo(os.path.join(repo.root_dir, self.def_path))

# we want stream but not fetch, so DVC out directories are
# walked, but dir contents is not fetched
tree = RepoTree(repo, stream=True)

# We are polluting our repo cache with some dir listing here
return self.repo.cache.local.get_checksum(path)
if tree.isdir(path):
return self.repo.cache.local.get_dir_checksum(
path, tree=tree
)
return tree.get_file_checksum(path)

def status(self):
current_checksum = self._get_checksum(locked=True)
Expand All @@ -76,16 +87,16 @@ def dumpd(self):
def download(self, to):
with self._make_repo() as repo:
if self.def_repo.get(self.PARAM_REV_LOCK) is None:
self.def_repo[self.PARAM_REV_LOCK] = repo.scm.get_rev()

if hasattr(repo, "cache"):
repo.cache.local.cache_dir = self.repo.cache.local.cache_dir
self.def_repo[self.PARAM_REV_LOCK] = repo.get_rev()

repo.pull_to(self.def_path, to.path_info)
cache = self.repo.cache.local
with repo.use_cache(cache):
_, _, cache_infos = repo.fetch_external([self.def_path])
cache.checkout(to.path_info, cache_infos[0])

def update(self, rev=None):
if rev:
self.def_repo[self.PARAM_REV] = rev

with self._make_repo(locked=False) as repo:
self.def_repo[self.PARAM_REV_LOCK] = repo.scm.get_rev()
self.def_repo[self.PARAM_REV_LOCK] = repo.get_rev()
186 changes: 110 additions & 76 deletions dvc/external_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import threading
from contextlib import contextmanager
from distutils.dir_util import copy_tree
from typing import Iterable

from funcy import cached_property, retry, suppress, wrap_with
from funcy import cached_property, retry, wrap_with

from dvc.config import NoRemoteError, NotDvcRepoError
from dvc.exceptions import (
CheckoutError,
FileMissingError,
NoOutputInExternalRepoError,
NoRemoteInExternalRepoError,
Expand All @@ -18,9 +18,10 @@
)
from dvc.path_info import PathInfo
from dvc.repo import Repo
from dvc.repo.tree import RepoTree
from dvc.scm.git import Git
from dvc.utils import tmp_fname
from dvc.utils.fs import fs_copy, move, remove
from dvc.scm.tree import is_working_tree
from dvc.utils.fs import remove

logger = logging.getLogger(__name__)

Expand All @@ -29,10 +30,12 @@
def external_repo(url, rev=None, for_write=False):
logger.debug("Creating external repo %s@%s", url, rev)
path = _cached_clone(url, rev, for_write=for_write)
if not rev:
rev = "HEAD"
try:
repo = ExternalRepo(path, url)
repo = ExternalRepo(path, url, rev, for_write=for_write)
except NotDvcRepoError:
repo = ExternalGitRepo(path, url)
repo = ExternalGitRepo(path, url, rev)

try:
yield repo
Expand Down Expand Up @@ -64,58 +67,98 @@ def clean_repos():
_remove(path)


class ExternalRepo(Repo):
def __init__(self, root_dir, url):
super().__init__(root_dir)
self.url = url
self._set_cache_dir()
self._fix_upstream()

def pull_to(self, path, to_info):
"""
Pull the corresponding file or directory specified by `path` and
checkout it into `to_info`.

It works with files tracked by Git and DVC, and also local files
outside the repository.
"""
out = None
path_info = PathInfo(self.root_dir) / path
class BaseExternalRepo:

with suppress(OutputNotFoundError):
(out,) = self.find_outs_by_path(path_info, strict=False)
_local_cache = None

try:
if out and out.use_cache:
self._pull_cached(out, path_info, to_info)
return
@property
def local_cache(self):
if hasattr(self, "cache"):
return self.cache.local
return self._local_cache

# Check if it is handled by Git (it can't have an absolute path)
if os.path.isabs(path):
raise FileNotFoundError

fs_copy(path_info, to_info)
except FileNotFoundError:
raise PathMissingError(path, self.url)
@contextmanager
def use_cache(self, cache):
"""Use the specified cache in place of default tmpdir cache for
download operations.
"""
if hasattr(self, "cache"):
save_cache = self.cache.local
self.cache.local = cache
self._local_cache = cache

def _pull_cached(self, out, path_info, dest):
with self.state:
tmp = PathInfo(tmp_fname(dest))
src = tmp / path_info.relative_to(out.path_info)
yield

out.path_info = tmp
if hasattr(self, "cache"):
self.cache.local = save_cache
self._local_cache = None

# Only pull unless all needed cache is present
if out.changed_cache(filter_info=src):
self.cloud.pull(out.get_used_cache(filter_info=src))
@cached_property
def repo_tree(self):
return RepoTree(self, fetch=True)

def get_rev(self):
if is_working_tree(self.tree):
return self.scm.get_rev()
if hasattr(self.tree, "tree"):
return self.tree.tree.rev
return self.tree.rev

def fetch_external(self, paths: Iterable, **kwargs):
"""Fetch specified external repo paths into cache.

Returns 3-tuple in the form
(downloaded, failed, list(cache_infos))
where cache_infos can be used as checkout targets for the
fetched paths.
"""
download_results = []
failed = 0

paths = [PathInfo(self.root_dir) / path for path in paths]

def download_update(result):
download_results.append(result)

save_infos = []
for path in paths:
if not self.repo_tree.exists(path):
raise PathMissingError(path, self.url)
save_info = self.local_cache.save(
path,
None,
tree=self.repo_tree,
download_callback=download_update,
)
save_infos.append(save_info)

return sum(download_results), failed, save_infos

def get_external(self, path, dest):
"""Convenience wrapper for fetch_external and checkout."""
if self.local_cache:
# fetch DVC and git files to tmpdir cache, then checkout
_, _, save_infos = self.fetch_external([path])
self.local_cache.checkout(PathInfo(dest), save_infos[0])
else:
# git-only erepo with no cache, just copy files directly
# to dest
path = PathInfo(self.root_dir) / path
if not self.repo_tree.exists(path):
raise PathMissingError(path, self.url)
self.repo_tree.copytree(path, dest)

try:
out.checkout(filter_info=src)
except CheckoutError:
raise FileNotFoundError

move(src, dest)
remove(tmp)
class ExternalRepo(Repo, BaseExternalRepo):
def __init__(self, root_dir, url, rev, for_write=False):
if for_write:
super().__init__(root_dir)
else:
root_dir = os.path.realpath(root_dir)
super().__init__(root_dir, scm=Git(root_dir), rev=rev)
self.url = url
self._set_cache_dir()
self._fix_upstream()

@wrap_with(threading.Lock())
def _set_cache_dir(self):
Expand All @@ -125,6 +168,7 @@ def _set_cache_dir(self):
cache_dir = CACHE_DIRS[self.url] = tempfile.mkdtemp("dvc-cache")

self.cache.local.cache_dir = cache_dir
self._local_cache = self.cache.local

def _fix_upstream(self):
if not os.path.isdir(self.url):
Expand Down Expand Up @@ -165,10 +209,11 @@ def _add_upstream(self, src_repo):
self.config["core"]["remote"] = "auto-generated-upstream"


class ExternalGitRepo:
def __init__(self, root_dir, url):
self.root_dir = root_dir
class ExternalGitRepo(BaseExternalRepo):
def __init__(self, root_dir, url, rev):
self.root_dir = os.path.realpath(root_dir)
self.url = url
self.tree = self.scm.get_tree(rev)

@cached_property
def scm(self):
Expand All @@ -181,23 +226,15 @@ def close(self):
def find_out_by_relpath(self, path):
raise OutputNotFoundError(path, self)

def pull_to(self, path, to_info):
try:
# Git handled files can't have absolute path
if os.path.isabs(path):
raise FileNotFoundError

fs_copy(os.path.join(self.root_dir, path), to_info)
except FileNotFoundError:
raise PathMissingError(path, self.url)

@contextmanager
def open_by_relpath(self, path, mode="r", encoding=None, **kwargs):
"""Opens a specified resource as a file object."""
tree = RepoTree(self)
try:
abs_path = os.path.join(self.root_dir, path)
with open(abs_path, mode, encoding=encoding) as fd:
yield fd
with tree.open(
path, mode=mode, encoding=encoding, **kwargs
) as fobj:
yield fobj
except FileNotFoundError:
raise PathMissingError(path, self.url)

Expand All @@ -209,26 +246,23 @@ def _cached_clone(url, rev, for_write=False):
revision checked out. If for_write is set prevents reusing this dir via
cache.
"""
if not for_write and Git.is_sha(rev) and (url, rev) in CLONES:
return CLONES[url, rev]

# even if we have already cloned this repo, we may need to
# fetch/fast-forward to get specified rev
clone_path = _clone_default_branch(url, rev)
rev_sha = Git(clone_path).resolve_rev(rev or "HEAD")

if not for_write and (url, rev_sha) in CLONES:
return CLONES[url, rev_sha]
if not for_write and (url) in CLONES:
return CLONES[url]

# Copy to a new dir to keep the clone clean
repo_path = tempfile.mkdtemp("dvc-erepo")
logger.debug("erepo: making a copy of %s clone", url)
copy_tree(clone_path, repo_path)

# Check out the specified revision
if rev is not None:
if for_write:
_git_checkout(repo_path, rev)

if not for_write:
CLONES[url, rev_sha] = repo_path
else:
CLONES[url] = repo_path
return repo_path


Expand Down
8 changes: 6 additions & 2 deletions dvc/istextfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
TEXT_CHARS = bytes(range(32, 127)) + b"\n\r\t\f\b"


def istextfile(fname, blocksize=512):
def istextfile(fname, blocksize=512, tree=None):
""" Uses heuristics to guess whether the given file is text or binary,
by reading a single block of bytes from the file.
If more than 30% of the chars in the block are non-text, or there
are NUL ('\x00') bytes in the block, assume this is a binary file.
"""
with open(fname, "rb") as fobj:
if tree:
open_func = tree.open
else:
open_func = open
with open_func(fname, "rb") as fobj:
block = fobj.read(blocksize)

if not block:
Expand Down
Loading