From 073d4debedab53e33baa554a000f9d51e24fdd85 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 6 Apr 2020 16:40:01 +0900 Subject: [PATCH 01/15] remote: locally index list of checksums available on cloud remotes --- dvc/remote/base.py | 41 ++++++++++++++--- dvc/remote/index.py | 78 +++++++++++++++++++++++++++++++++ dvc/remote/local.py | 10 +++++ dvc/repo/__init__.py | 2 + tests/unit/remote/test_base.py | 2 +- tests/unit/remote/test_index.py | 36 +++++++++++++++ 6 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 dvc/remote/index.py create mode 100644 tests/unit/remote/test_index.py diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c349a7a12b..8fc87c2cf2 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -1,4 +1,5 @@ import errno +import hashlib import itertools import json import logging @@ -23,6 +24,7 @@ from dvc.ignore import DvcIgnore from dvc.path_info import PathInfo, URLInfo from dvc.progress import Tqdm +from dvc.remote.index import RemoteIndex from dvc.remote.slow_link_detection import slow_link_guard from dvc.state import StateNoop from dvc.utils import tmp_fname @@ -111,6 +113,13 @@ def __init__(self, repo, config): self.cache_types = config.get("type") or copy(self.DEFAULT_CACHE_TYPES) self.cache_type_confirmed = False + url = config.get("url") + if url: + index_name = hashlib.md5(url.encode("utf-8")).hexdigest() + else: + index_name = None + self.index = RemoteIndex(self.repo, index_name) + @classmethod def get_missing_deps(cls): import importlib @@ -745,8 +754,11 @@ def gc(self, named_cache, jobs=None): path_info = self.checksum_to_path_info(checksum) if self.is_dir_checksum(checksum): self._remove_unpacked_dir(checksum) + self.index.remove(checksum) self.remove(path_info) removed = True + if removed: + self.index.save() return removed def is_protected(self, path_info): @@ -865,10 +877,21 @@ def cache_exists(self, checksums, jobs=None, name=None): # cache_exists() (see ssh, local) assert self.TRAVERSE_PREFIX_LEN >= 2 - if len(checksums) == 1 or not self.CAN_TRAVERSE: - return self._cache_object_exists(checksums, jobs, name) + checksums = set(checksums) + indexed_checksums = checksums & self.index.checksums + checksums -= indexed_checksums + logger.debug( + "Matched {} indexed checksums".format(len(indexed_checksums)) + ) + if not checksums: + return indexed_checksums - checksums = frozenset(checksums) + if len(checksums) == 1 or not self.CAN_TRAVERSE: + remote_checksums = self._cache_object_exists(checksums, jobs, name) + if remote_checksums: + self.index.update(remote_checksums) + self.index.save() + return list(indexed_checksums) + remote_checksums # Max remote size allowed for us to use traverse method remote_size, remote_checksums = self._estimate_cache_size( @@ -900,10 +923,16 @@ def cache_exists(self, checksums, jobs=None, name=None): logger.debug( "Querying {} checksums via traverse".format(len(checksums)) ) - remote_checksums = self._cache_checksums_traverse( - remote_size, remote_checksums, jobs, name + remote_checksums = set( + self._cache_checksums_traverse( + remote_size, remote_checksums, jobs, name + ) + ) + self.index.replace(remote_checksums) + self.index.save() + return list(indexed_checksums) + list( + checksums & set(remote_checksums) ) - return list(checksums & set(remote_checksums)) def _checksums_with_limit( self, limit, prefix=None, progress_callback=None diff --git a/dvc/remote/index.py b/dvc/remote/index.py new file mode 100644 index 0000000000..d829947b90 --- /dev/null +++ b/dvc/remote/index.py @@ -0,0 +1,78 @@ +import logging +import pathlib +import pickle + +logger = logging.getLogger(__name__) + + +class RemoteIndex(object): + """Class for locally indexing remote checksums. + + Args: + repo: repo for this remote + """ + + INDEX_SUFFIX = ".idx" + + def __init__(self, repo, name): + if repo and hasattr(repo, "index_dir") and name: + self.path = pathlib.Path(repo.index_dir).joinpath( + "{}{}".format(name, self.INDEX_SUFFIX) + ) + else: + self.path = None + self._checksums = set() + self.load() + + def __iter__(self): + return iter(self._checksums) + + @property + def checksums(self): + return self._checksums + + def load(self): + """(Re)load this index from disk.""" + if self.path and self.path.exists(): + try: + with open(self.path, "rb") as fd: + self._checksums = pickle.load(fd) + except IOError: + logger.error( + "Failed to load remote index from '{}'".format(self.path) + ) + + def save(self): + """Save this index to disk.""" + if self.path: + try: + with open(self.path, "wb") as fd: + pickle.dump(self._checksums, fd) + except IOError: + logger.error( + "Failed to save remote index to '{}'".format(self.path) + ) + + def invalidate(self): + """Invalidate this index (to force re-indexing later).""" + self._checksums.clear() + if self.path and self.path.exists(): + self.path.unlink() + + def remove(self, checksum): + if checksum in self._checksums: + self._checksums.remove(checksum) + + def replace(self, checksums): + """Replace the full contents of this index with ``checksums``. + + Changes to the index will not be written to disk. + """ + self._checksums = set(checksums) + + def update(self, *checksums): + """Update this index, adding elements from ``checksums``. + + Changes to the index will not be written to disk. + """ + self._checksums.update(*checksums) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index fe0270ef34..615dcd578a 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -369,8 +369,18 @@ def _process( if fails: if download: + remote.index.invalidate() raise DownloadError(fails) raise UploadError(fails) + elif not download: + pushed_checksums = list(map(self.path_to_checksum, plans[0])) + logger.debug( + "Adding {} pushed checksums to index".format( + len(pushed_checksums) + ) + ) + remote.index.update(pushed_checksums) + remote.index.save() return len(plans[0]) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index e7f2b4dfd3..e26f13a58f 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -85,6 +85,8 @@ def __init__(self, root_dir=None): self.tmp_dir = os.path.join(self.dvc_dir, "tmp") makedirs(self.tmp_dir, exist_ok=True) + self.index_dir = os.path.join(self.dvc_dir, "index") + makedirs(self.index_dir, exist_ok=True) hardlink_lock = self.config["core"].get("hardlink_lock", False) self.lock = make_lock( diff --git a/tests/unit/remote/test_base.py b/tests/unit/remote/test_base.py index e8cafa3b26..7334594627 100644 --- a/tests/unit/remote/test_base.py +++ b/tests/unit/remote/test_base.py @@ -52,7 +52,7 @@ def test_cache_exists(object_exists, traverse, dvc): with mock.patch.object( remote, "cache_checksums", return_value=list(range(256)) ): - checksums = list(range(1000)) + checksums = set(range(1000)) remote.cache_exists(checksums) object_exists.assert_called_with(checksums, None, None) traverse.assert_not_called() diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py new file mode 100644 index 0000000000..967b7076a3 --- /dev/null +++ b/tests/unit/remote/test_index.py @@ -0,0 +1,36 @@ +import pickle +import os.path + +from dvc.remote.index import RemoteIndex +from tests.basic_env import TestDvc + + +class TestRemoteIndex(TestDvc): + def test_init(self): + index = RemoteIndex(self.dvc, "foo") + assert str(index.path) == os.path.join(self.dvc.index_dir, "foo.idx") + + def test_load(self): + checksums = {1, 2, 3} + path = os.path.join(self.dvc.index_dir, "foo.idx") + with open(path, "wb") as fd: + pickle.dump(checksums, fd) + index = RemoteIndex(self.dvc, "foo") + assert index.checksums == checksums + + def test_save(self): + index = RemoteIndex(self.dvc, "foo") + index.replace({4, 5, 6}) + index.save() + path = os.path.join(self.dvc.index_dir, "foo.idx") + with open(path, "rb") as fd: + checksums = pickle.load(fd) + assert index.checksums == checksums + + def test_invalidate(self): + index = RemoteIndex(self.dvc, "foo") + index.replace({1, 2, 3}) + index.save() + index.invalidate() + assert not index.checksums + assert not index.path.exists() From c8be39a099272595d8a7adbf7faac3580301d3bd Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 7 Apr 2020 12:36:05 +0900 Subject: [PATCH 02/15] fix review issues - use pytest fixtures - use sha256 digest - s/fd/fobj/ - check more specific IO errors --- dvc/remote/base.py | 2 +- dvc/remote/index.py | 30 +++++++++------ dvc/repo/__init__.py | 3 +- tests/unit/remote/test_index.py | 65 +++++++++++++++++---------------- 4 files changed, 53 insertions(+), 47 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 8fc87c2cf2..35d076a05c 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -115,7 +115,7 @@ def __init__(self, repo, config): url = config.get("url") if url: - index_name = hashlib.md5(url.encode("utf-8")).hexdigest() + index_name = hashlib.sha256(url.encode("utf-8")).hexdigest() else: index_name = None self.index = RemoteIndex(self.repo, index_name) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index d829947b90..dc1a6d7dfa 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -9,13 +9,17 @@ class RemoteIndex(object): """Class for locally indexing remote checksums. Args: - repo: repo for this remote + repo: repo for this remote index. + name: name for this index. If name is provided, this index will be + loaded from and saved to ``.dvc/tmp/index/{name}.idx``. + If name is not provided (i.e. for local remotes), this index will + be kept in memory but not saved to disk. """ INDEX_SUFFIX = ".idx" - def __init__(self, repo, name): - if repo and hasattr(repo, "index_dir") and name: + def __init__(self, repo, name=None): + if name: self.path = pathlib.Path(repo.index_dir).joinpath( "{}{}".format(name, self.INDEX_SUFFIX) ) @@ -33,24 +37,26 @@ def checksums(self): def load(self): """(Re)load this index from disk.""" - if self.path and self.path.exists(): + if self.path and self.path.is_file(): try: - with open(self.path, "rb") as fd: - self._checksums = pickle.load(fd) - except IOError: + with open(self.path, "rb") as fobj: + self._checksums = pickle.load(fobj) + except PermissionError: logger.error( - "Failed to load remote index from '{}'".format(self.path) + "Insufficient permissions to read index file " + "'{}'".format(self.path) ) def save(self): """Save this index to disk.""" if self.path: try: - with open(self.path, "wb") as fd: - pickle.dump(self._checksums, fd) - except IOError: + with open(self.path, "wb") as fobj: + pickle.dump(self._checksums, fobj) + except PermissionError: logger.error( - "Failed to save remote index to '{}'".format(self.path) + "Insufficient permissions to write index file " + "'{}'".format(self.path) ) def invalidate(self): diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index e26f13a58f..6874faee10 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -84,8 +84,7 @@ def __init__(self, root_dir=None): self.tree = WorkingTree(self.root_dir) self.tmp_dir = os.path.join(self.dvc_dir, "tmp") - makedirs(self.tmp_dir, exist_ok=True) - self.index_dir = os.path.join(self.dvc_dir, "index") + self.index_dir = os.path.join(self.tmp_dir, "index") makedirs(self.index_dir, exist_ok=True) hardlink_lock = self.config["core"].get("hardlink_lock", False) diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 967b7076a3..8799876ee1 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -2,35 +2,36 @@ import os.path from dvc.remote.index import RemoteIndex -from tests.basic_env import TestDvc - - -class TestRemoteIndex(TestDvc): - def test_init(self): - index = RemoteIndex(self.dvc, "foo") - assert str(index.path) == os.path.join(self.dvc.index_dir, "foo.idx") - - def test_load(self): - checksums = {1, 2, 3} - path = os.path.join(self.dvc.index_dir, "foo.idx") - with open(path, "wb") as fd: - pickle.dump(checksums, fd) - index = RemoteIndex(self.dvc, "foo") - assert index.checksums == checksums - - def test_save(self): - index = RemoteIndex(self.dvc, "foo") - index.replace({4, 5, 6}) - index.save() - path = os.path.join(self.dvc.index_dir, "foo.idx") - with open(path, "rb") as fd: - checksums = pickle.load(fd) - assert index.checksums == checksums - - def test_invalidate(self): - index = RemoteIndex(self.dvc, "foo") - index.replace({1, 2, 3}) - index.save() - index.invalidate() - assert not index.checksums - assert not index.path.exists() + + +def test_init(dvc): + index = RemoteIndex(dvc, "foo") + assert str(index.path) == os.path.join(dvc.index_dir, "foo.idx") + + +def test_load(dvc): + checksums = {1, 2, 3} + path = os.path.join(dvc.index_dir, "foo.idx") + with open(path, "wb") as fd: + pickle.dump(checksums, fd) + index = RemoteIndex(dvc, "foo") + assert index.checksums == checksums + + +def test_save(dvc): + index = RemoteIndex(dvc, "foo") + index.replace({4, 5, 6}) + index.save() + path = os.path.join(dvc.index_dir, "foo.idx") + with open(path, "rb") as fd: + checksums = pickle.load(fd) + assert index.checksums == checksums + + +def test_invalidate(dvc): + index = RemoteIndex(dvc, "foo") + index.replace({1, 2, 3}) + index.save() + index.invalidate() + assert not index.checksums + assert not index.path.exists() From 6a8c57e727709af2e5bc43228885dc59b8f95595 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 7 Apr 2020 16:26:09 +0900 Subject: [PATCH 03/15] Use .dir checksums to (in)validate local remote index - `used_cache()`/`get_used_cache()` in repo/stage/output now return tuples of (dir_cache, file_cache) for directories rather than one flat/merged cache - if local index contains directory checksums, they will always be checked on the remote. If an expected .dir checksum is missing, the local index will be invalidated/cleared --- dvc/data_cloud.py | 46 ++++++++++++++++++------------- dvc/external_repo.py | 2 +- dvc/output/base.py | 18 ++++++++----- dvc/remote/local.py | 64 +++++++++++++++++++++++++++++++++----------- dvc/repo/__init__.py | 28 ++++++++++++++----- dvc/repo/fetch.py | 27 ++++++++++++------- dvc/stage.py | 7 +++-- 7 files changed, 131 insertions(+), 61 deletions(-) diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index ea375b65ce..a3976f1452 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -48,11 +48,12 @@ def get_remote(self, remote=None, command=""): def _init_remote(self, remote): return Remote(self.repo, name=remote) - def push(self, cache, jobs=None, remote=None, show_checksums=False): + def push(self, caches, jobs=None, remote=None, show_checksums=False): """Push data items in a cloud-agnostic way. Args: - cache (NamedCache): named checksums to push to the cloud. + caches (list): list of (dir_cache, file_cache) tuples containing + named checksums to push to the cloud. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to push to. By default remote from core.remote config option is used. @@ -60,17 +61,18 @@ def push(self, cache, jobs=None, remote=None, show_checksums=False): information messages. """ return self.repo.cache.local.push( - cache, + caches, jobs=jobs, remote=self.get_remote(remote, "push"), show_checksums=show_checksums, ) - def pull(self, cache, jobs=None, remote=None, show_checksums=False): + def pull(self, caches, jobs=None, remote=None, show_checksums=False): """Pull data items in a cloud-agnostic way. Args: - cache (NamedCache): named checksums to pull from the cloud. + caches (list): list of (dir_cache, file_cache) tuples containing + named checksums to pull from the cloud. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to pull from. By default remote from core.remote config option is used. @@ -79,28 +81,36 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False): """ remote = self.get_remote(remote, "pull") downloaded_items_num = self.repo.cache.local.pull( - cache, jobs=jobs, remote=remote, show_checksums=show_checksums + caches, jobs=jobs, remote=remote, show_checksums=show_checksums ) if not remote.verify: - self._save_pulled_checksums(cache) + self._save_pulled_checksums(caches) return downloaded_items_num def _save_pulled_checksums(self, cache): - for checksum in cache["local"].keys(): - cache_file = self.repo.cache.local.checksum_to_path_info(checksum) - if self.repo.cache.local.exists(cache_file): - # We can safely save here, as existing corrupted files will be - # removed upon status, while files corrupted during download - # will not be moved from tmp_file (see `RemoteBASE.download()`) - self.repo.state.save(cache_file, checksum) - - def status(self, cache, jobs=None, remote=None, show_checksums=False): + for dir_cache, file_cache in cache: + checksums = set(file_cache["local"].keys()) + if dir_cache is not None: + checksums.update(dir_cache["local"].keys()) + for checksum in checksums: + cache_file = self.repo.cache.local.checksum_to_path_info( + checksum + ) + if self.repo.cache.local.exists(cache_file): + # We can safely save here, as existing corrupted files will + # be removed upon status, while files corrupted during + # download will not be moved from tmp_file + # (see `RemoteBASE.download()`) + self.repo.state.save(cache_file, checksum) + + def status(self, caches, jobs=None, remote=None, show_checksums=False): """Check status of data items in a cloud-agnostic way. Args: - cache (NamedCache): named checksums to check status for. + caches (list): list of (dir_cache, file_cache) tuples containg + named checksums to check status for. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to compare cache to. By default remote from core.remote config option @@ -110,5 +120,5 @@ def status(self, cache, jobs=None, remote=None, show_checksums=False): """ remote = self.get_remote(remote, "status") return self.repo.cache.local.status( - cache, jobs=jobs, remote=remote, show_checksums=show_checksums + caches, jobs=jobs, remote=remote, show_checksums=show_checksums ) diff --git a/dvc/external_repo.py b/dvc/external_repo.py index a598eb6692..142fcfc9d2 100644 --- a/dvc/external_repo.py +++ b/dvc/external_repo.py @@ -104,7 +104,7 @@ def _pull_cached(self, out, path_info, dest): # Only pull unless all needed cache is present if out.changed_cache(filter_info=src): - self.cloud.pull(out.get_used_cache(filter_info=src)) + self.cloud.pull([out.get_used_cache(filter_info=src)]) try: out.checkout(filter_info=src) diff --git a/dvc/output/base.py b/dvc/output/base.py index f6f7856de5..40cc2013e7 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -401,16 +401,22 @@ def get_used_cache(self, **kwargs): In case that the given output is a directory, it will also include the `info` of its files. + + Returns: + 2-tuple of NamedCache objects in the form of + (directory `info`, file `info`). + If the given output is not a directory, the first tuple entry will + be None. """ if not self.use_cache: - return NamedCache() + return None, NamedCache() if self.stage.is_repo_import: cache = NamedCache() (dep,) = self.stage.deps cache.external[dep.repo_pair].add(dep.def_path) - return cache + return None, cache if not self.checksum: msg = ( @@ -429,16 +435,14 @@ def get_used_cache(self, **kwargs): ) ) logger.warning(msg) - return NamedCache() + return None, NamedCache() ret = NamedCache.make(self.scheme, self.checksum, str(self)) if not self.is_dir_checksum: - return ret + return None, ret - ret.update(self._collect_used_dir_cache(**kwargs)) - - return ret + return ret, self._collect_used_dir_cache(**kwargs) @classmethod def _validate_output_path(cls, path): diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 615dcd578a..10de85805f 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -5,8 +5,11 @@ from concurrent.futures import ThreadPoolExecutor from functools import partial +from funcy import cat + from shortuuid import uuid +from dvc.cache import NamedCache from dvc.compat import fspath_py35 from dvc.exceptions import DvcException, DownloadError, UploadError from dvc.path_info import PathInfo @@ -250,7 +253,7 @@ def open(path_info, mode="r", encoding=None): def status( self, - named_cache, + named_caches, remote, jobs=None, show_checksums=False, @@ -259,28 +262,59 @@ def status( logger.debug( "Preparing to collect status from {}".format(remote.path_info) ) - md5s = list(named_cache[self.scheme]) + cache = NamedCache() + dir_md5s = {} + md5s = set() + for dir_cache, file_cache in named_caches: + cache.update(file_cache) + md5s.update(file_cache[self.scheme]) + if dir_cache is not None: + cache.update(dir_cache) + checksums = frozenset(dir_cache[self.scheme].keys()) + md5s.update(checksums) + dir_md5s[checksums] = frozenset(file_cache[self.scheme]) logger.debug("Collecting information from local cache...") - local_exists = self.cache_exists(md5s, jobs=jobs, name=self.cache_dir) + local_exists = frozenset( + self.cache_exists(md5s, jobs=jobs, name=self.cache_dir) + ) # This is a performance optimization. We can safely assume that, # if the resources that we want to fetch are already cached, # there's no need to check the remote storage for the existence of # those files. - if download and sorted(local_exists) == sorted(md5s): + if download and local_exists == md5s: remote_exists = local_exists else: logger.debug("Collecting information from remote cache...") - remote_exists = list( - remote.cache_exists( - md5s, jobs=jobs, name=str(remote.path_info) - ) + remote_exists = set() + # verify that our index is still valid by checking that any known + # .dir checksums still exist on the remote + verify_md5s = remote.index.checksums.intersection( + cat(dir_md5s.keys()) ) + if verify_md5s: + remote_dir_exists = frozenset( + remote._cache_object_exists(verify_md5s) + ) + md5s -= remote_dir_exists + remote_exists.update(remote_dir_exists) + if remote_dir_exists != verify_md5s: + logger.debug( + "Remote cache missing an expected .dir checksum, " + "clearing local index" + ) + remote.index.invalidate() + if md5s: + remote_exists.update( + remote.cache_exists( + md5s, jobs=jobs, name=str(remote.path_info) + ) + ) ret = { checksum: {"name": checksum if show_checksums else " ".join(names)} - for checksum, names in named_cache[self.scheme].items() + for checksum, names in cache[self.scheme].items() } self._fill_statuses(ret, local_exists, remote_exists) @@ -321,7 +355,7 @@ def _get_plans(self, download, remote, status_info, status): def _process( self, - named_cache, + named_caches, remote, jobs=None, show_checksums=False, @@ -349,7 +383,7 @@ def _process( jobs = remote.JOBS status_info = self.status( - named_cache, + named_caches, remote, jobs=jobs, show_checksums=show_checksums, @@ -384,18 +418,18 @@ def _process( return len(plans[0]) - def push(self, named_cache, remote, jobs=None, show_checksums=False): + def push(self, named_caches, remote, jobs=None, show_checksums=False): return self._process( - named_cache, + named_caches, remote, jobs=jobs, show_checksums=show_checksums, download=False, ) - def pull(self, named_cache, remote, jobs=None, show_checksums=False): + def pull(self, named_caches, remote, jobs=None, show_checksums=False): return self._process( - named_cache, + named_caches, remote, jobs=jobs, show_checksums=show_checksums, diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 6874faee10..8e66357238 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -252,12 +252,18 @@ def used_cache( `all_branches`/`all_tags`/`all_commits` to expand the scope. Returns: - A dictionary with Schemes (representing output's location) as keys, + A list of 2-tuples in the form (dir_cache, file_cache). + Each NamedCache object is a dictionary with Schemes + (representing output's location) as keys, and a list with the outputs' `dumpd` as values. + If the given output is not a directory, the first tuple entry + will be None. """ from dvc.cache import NamedCache - cache = NamedCache() + used_caches = [] + # group together file caches which do not have an associated directory + file_caches = NamedCache() for branch in self.brancher( all_branches=all_branches, @@ -275,15 +281,23 @@ def used_cache( suffix = "({})".format(branch) if branch else "" for stage, filter_info in pairs: - used_cache = stage.get_used_cache( + for dir_cache, file_cache in stage.get_used_cache( remote=remote, force=force, jobs=jobs, filter_info=filter_info, - ) - cache.update(used_cache, suffix=suffix) - - return cache + ): + if dir_cache is None: + file_caches.update(file_cache, suffix=suffix) + else: + used_dir = NamedCache() + used_dir.update(dir_cache, suffix=suffix) + used_file = NamedCache() + used_file.update(file_cache, suffix=suffix) + used_caches.append((used_dir, used_file)) + + used_caches.append((None, file_caches)) + return used_caches def _collect_graph(self, stages=None): """Generate a graph by using the given stages on the given directory diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index f77491bb79..ea6a4824be 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -1,12 +1,12 @@ import logging -from dvc.cache import NamedCache +from funcy import concat + from dvc.config import NoRemoteError from dvc.exceptions import DownloadError, OutputNotFoundError from dvc.scm.base import CloneError from dvc.path_info import PathInfo - logger = logging.getLogger(__name__) @@ -59,10 +59,17 @@ def _fetch( except DownloadError as exc: failed += exc.amount - for (repo_url, repo_rev), files in used.external.items(): - d, f = _fetch_external(self, repo_url, repo_rev, files, jobs) - downloaded += d - failed += f + for dir_cache, file_cache in used: + if dir_cache is None: + items = file_cache.external.items() + else: + items = concat( + dir_cache.external.items(), file_cache.external.items() + ) + for (repo_url, repo_rev), files in items: + d, f = _fetch_external(self, repo_url, repo_rev, files, jobs) + downloaded += d + failed += f if failed: raise DownloadError(failed) @@ -82,7 +89,7 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): if is_dvc_repo: repo.cache.local.cache_dir = self.cache.local.cache_dir with repo.state: - cache = NamedCache() + used_cache = [] for name in files: try: out = repo.find_out_by_relpath(name) @@ -90,10 +97,12 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): # try to add to cache if they are git-tracked files git_files.append(name) else: - cache.update(out.get_used_cache()) + used_cache.append(out.get_used_cache()) try: - downloaded += repo.cloud.pull(cache, jobs=jobs) + downloaded += repo.cloud.pull( + used_cache, jobs=jobs + ) except DownloadError as exc: failed += exc.amount diff --git a/dvc/stage.py b/dvc/stage.py index 14de51501a..4a74cc13fb 100644 --- a/dvc/stage.py +++ b/dvc/stage.py @@ -1059,10 +1059,9 @@ def get_all_files_number(self, filter_info=None): ) def get_used_cache(self, *args, **kwargs): - from .cache import NamedCache - cache = NamedCache() + ret = [] for out in self._filter_outs(kwargs.get("filter_info")): - cache.update(out.get_used_cache(*args, **kwargs)) + ret.append(out.get_used_cache(*args, **kwargs)) - return cache + return ret From 7c0c27115a3b94806eff22f050eee5e2176ff237 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 7 Apr 2020 17:13:25 +0900 Subject: [PATCH 04/15] Add --drop-index CLI option for push/pull/status --- dvc/command/data_sync.py | 20 ++++++++++++++ dvc/command/status.py | 1 + dvc/data_cloud.py | 40 ++++++++++++++++++++++++---- dvc/remote/local.py | 57 ++++++++++++++++++++++++++++------------ dvc/repo/fetch.py | 7 ++++- dvc/repo/pull.py | 2 ++ dvc/repo/push.py | 3 ++- dvc/repo/status.py | 7 ++++- 8 files changed, 112 insertions(+), 25 deletions(-) diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index 3e6a3b8ed2..cfd8e4522b 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -30,6 +30,7 @@ def run(self): with_deps=self.args.with_deps, force=self.args.force, recursive=self.args.recursive, + drop_index=self.args.drop_index, ) log_summary(stats) except (CheckoutError, DvcException) as exc: @@ -53,6 +54,7 @@ def run(self): all_commits=self.args.all_commits, with_deps=self.args.with_deps, recursive=self.args.recursive, + drop_index=self.args.drop_index, ) except DvcException: logger.exception("failed to push data to the cloud") @@ -158,6 +160,12 @@ def add_parser(subparsers, _parent_parser): default=False, help="Pull cache for subdirectories of the specified directory.", ) + pull_parser.add_argument( + "--drop-index", + action="store_true", + default=False, + help="Drop local index for the specified remote cache.", + ) pull_parser.set_defaults(func=CmdDataPull) # Push @@ -207,6 +215,12 @@ def add_parser(subparsers, _parent_parser): default=False, help="Push cache for subdirectories of specified directory.", ) + push_parser.add_argument( + "--drop-index", + action="store_true", + default=False, + help="Drop local index for the specified remote cache.", + ) push_parser.set_defaults(func=CmdDataPush) # Fetch @@ -324,4 +338,10 @@ def add_parser(subparsers, _parent_parser): default=False, help="Show status for all dependencies of the specified target.", ) + status_parser.add_argument( + "--drop-index", + action="store_true", + default=False, + help="Drop local index for the specified remote cache.", + ) status_parser.set_defaults(func=CmdDataStatus) diff --git a/dvc/command/status.py b/dvc/command/status.py index a441f71369..4585fd2ef4 100644 --- a/dvc/command/status.py +++ b/dvc/command/status.py @@ -49,6 +49,7 @@ def run(self): all_tags=self.args.all_tags, all_commits=self.args.all_commits, with_deps=self.args.with_deps, + drop_index=self.args.drop_index, ) if st: if self.args.quiet: diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index a3976f1452..11b7a04e42 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -48,7 +48,14 @@ def get_remote(self, remote=None, command=""): def _init_remote(self, remote): return Remote(self.repo, name=remote) - def push(self, caches, jobs=None, remote=None, show_checksums=False): + def push( + self, + caches, + jobs=None, + remote=None, + show_checksums=False, + drop_index=False, + ): """Push data items in a cloud-agnostic way. Args: @@ -65,9 +72,17 @@ def push(self, caches, jobs=None, remote=None, show_checksums=False): jobs=jobs, remote=self.get_remote(remote, "push"), show_checksums=show_checksums, + drop_index=drop_index, ) - def pull(self, caches, jobs=None, remote=None, show_checksums=False): + def pull( + self, + caches, + jobs=None, + remote=None, + show_checksums=False, + drop_index=False, + ): """Pull data items in a cloud-agnostic way. Args: @@ -81,7 +96,11 @@ def pull(self, caches, jobs=None, remote=None, show_checksums=False): """ remote = self.get_remote(remote, "pull") downloaded_items_num = self.repo.cache.local.pull( - caches, jobs=jobs, remote=remote, show_checksums=show_checksums + caches, + jobs=jobs, + remote=remote, + show_checksums=show_checksums, + drop_index=drop_index, ) if not remote.verify: @@ -105,7 +124,14 @@ def _save_pulled_checksums(self, cache): # (see `RemoteBASE.download()`) self.repo.state.save(cache_file, checksum) - def status(self, caches, jobs=None, remote=None, show_checksums=False): + def status( + self, + caches, + jobs=None, + remote=None, + show_checksums=False, + drop_index=False, + ): """Check status of data items in a cloud-agnostic way. Args: @@ -120,5 +146,9 @@ def status(self, caches, jobs=None, remote=None, show_checksums=False): """ remote = self.get_remote(remote, "status") return self.repo.cache.local.status( - caches, jobs=jobs, remote=remote, show_checksums=show_checksums + caches, + jobs=jobs, + remote=remote, + show_checksums=show_checksums, + drop_index=drop_index, ) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 10de85805f..25eb479d49 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -258,6 +258,7 @@ def status( jobs=None, show_checksums=False, download=False, + drop_index=False, ): logger.debug( "Preparing to collect status from {}".format(remote.path_info) @@ -288,23 +289,27 @@ def status( else: logger.debug("Collecting information from remote cache...") remote_exists = set() - # verify that our index is still valid by checking that any known - # .dir checksums still exist on the remote - verify_md5s = remote.index.checksums.intersection( - cat(dir_md5s.keys()) - ) - if verify_md5s: - remote_dir_exists = frozenset( - remote._cache_object_exists(verify_md5s) + if drop_index: + logger.debug("Clearing local index for remote cache") + remote.index.invalidate() + else: + # verify that our index is still valid by checking that any + # known .dir checksums still exist on the remote + verify_md5s = remote.index.checksums.intersection( + cat(dir_md5s.keys()) ) - md5s -= remote_dir_exists - remote_exists.update(remote_dir_exists) - if remote_dir_exists != verify_md5s: - logger.debug( - "Remote cache missing an expected .dir checksum, " - "clearing local index" + if verify_md5s: + remote_dir_exists = frozenset( + remote._cache_object_exists(verify_md5s) ) - remote.index.invalidate() + md5s -= remote_dir_exists + remote_exists.update(remote_dir_exists) + if remote_dir_exists != verify_md5s: + logger.debug( + "Remote cache missing an expected .dir checksum, " + "clearing local index" + ) + remote.index.invalidate() if md5s: remote_exists.update( remote.cache_exists( @@ -360,6 +365,7 @@ def _process( jobs=None, show_checksums=False, download=False, + drop_index=False, ): logger.debug( "Preparing to {} '{}'".format( @@ -388,6 +394,7 @@ def _process( jobs=jobs, show_checksums=show_checksums, download=download, + drop_index=drop_index, ) plans = self._get_plans(download, remote, status_info, status) @@ -418,22 +425,38 @@ def _process( return len(plans[0]) - def push(self, named_caches, remote, jobs=None, show_checksums=False): + def push( + self, + named_caches, + remote, + jobs=None, + show_checksums=False, + drop_index=False, + ): return self._process( named_caches, remote, jobs=jobs, show_checksums=show_checksums, download=False, + drop_index=drop_index, ) - def pull(self, named_caches, remote, jobs=None, show_checksums=False): + def pull( + self, + named_caches, + remote, + jobs=None, + show_checksums=False, + drop_index=False, + ): return self._process( named_caches, remote, jobs=jobs, show_checksums=show_checksums, download=True, + drop_index=drop_index, ) @staticmethod diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index ea6a4824be..49ef5d8321 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -21,6 +21,7 @@ def _fetch( all_tags=False, recursive=False, all_commits=False, + drop_index=False, ): """Download data items from a cloud and imported repositories @@ -51,7 +52,11 @@ def _fetch( try: downloaded += self.cloud.pull( - used, jobs, remote=remote, show_checksums=show_checksums + used, + jobs, + remote=remote, + show_checksums=show_checksums, + drop_index=drop_index, ) except NoRemoteError: if not used.external and used["local"]: diff --git a/dvc/repo/pull.py b/dvc/repo/pull.py index 4623f6a4ec..36d99f45b6 100644 --- a/dvc/repo/pull.py +++ b/dvc/repo/pull.py @@ -18,6 +18,7 @@ def pull( force=False, recursive=False, all_commits=False, + drop_index=False, ): processed_files_count = self._fetch( targets, @@ -28,6 +29,7 @@ def pull( all_commits=all_commits, with_deps=with_deps, recursive=recursive, + drop_index=drop_index, ) stats = self._checkout( targets=targets, with_deps=with_deps, force=force, recursive=recursive diff --git a/dvc/repo/push.py b/dvc/repo/push.py index 43946d37c0..ed515dd4f5 100644 --- a/dvc/repo/push.py +++ b/dvc/repo/push.py @@ -12,6 +12,7 @@ def push( all_tags=False, recursive=False, all_commits=False, + drop_index=False, ): used = self.used_cache( targets, @@ -24,4 +25,4 @@ def push( jobs=jobs, recursive=recursive, ) - return self.cloud.push(used, jobs, remote=remote) + return self.cloud.push(used, jobs, remote=remote, drop_index=drop_index) diff --git a/dvc/repo/status.py b/dvc/repo/status.py index 40780c66ec..4d48a81ba2 100644 --- a/dvc/repo/status.py +++ b/dvc/repo/status.py @@ -45,6 +45,7 @@ def _cloud_status( with_deps=False, all_tags=False, all_commits=False, + drop_index=False, ): """Returns a dictionary with the files that are new or deleted. @@ -86,7 +87,9 @@ def _cloud_status( ) ret = {} - status_info = self.cloud.status(used, jobs, remote=remote) + status_info = self.cloud.status( + used, jobs, remote=remote, drop_index=drop_index + ) for info in status_info.values(): name = info["name"] status = info["status"] @@ -111,6 +114,7 @@ def status( with_deps=False, all_tags=False, all_commits=False, + drop_index=False, ): if cloud or remote: return _cloud_status( @@ -122,6 +126,7 @@ def status( remote=remote, all_tags=all_tags, all_commits=all_commits, + drop_index=drop_index, ) ignored = list( From ca5e554a0a1ce12adc0feb074ac1f27891c3c3fb Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 7 Apr 2020 18:39:26 +0900 Subject: [PATCH 05/15] fix tests --- dvc/output/base.py | 3 ++- dvc/remote/base.py | 19 +++++++++++++------ dvc/repo/__init__.py | 3 ++- dvc/repo/fetch.py | 14 +++++++++++++- dvc/repo/gc.py | 5 ++--- tests/func/test_data_cloud.py | 6 ++++-- tests/unit/command/test_data_sync.py | 4 ++++ tests/unit/command/test_status.py | 2 ++ tests/unit/output/test_output.py | 5 ++++- tests/unit/remote/test_local.py | 2 +- tests/unit/repo/test_repo.py | 15 +++++++++++---- 11 files changed, 58 insertions(+), 20 deletions(-) diff --git a/dvc/output/base.py b/dvc/output/base.py index 40cc2013e7..218f3aa4b2 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -363,8 +363,9 @@ def _collect_used_dir_cache( if self.cache.changed_cache_file(self.checksum): try: + cache = NamedCache.make("local", self.checksum, str(self)) self.repo.cloud.pull( - NamedCache.make("local", self.checksum, str(self)), + [(None, cache)], jobs=jobs, remote=remote, show_checksums=False, diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 35d076a05c..46b339b41e 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -740,12 +740,15 @@ def all(self, jobs=None, name=None): remote_size, remote_checksums, jobs, name ) - def gc(self, named_cache, jobs=None): - logger.debug("named_cache: {} jobs: {}".format(named_cache, jobs)) - used = self.extract_used_local_checksums(named_cache) + def gc(self, named_caches, jobs=None): + logger.debug("named_caches: {} jobs: {}".format(named_caches, jobs)) + used = self.extract_used_local_checksums(named_caches) if self.scheme != "": - used.update(named_cache[self.scheme]) + for dir_cache, file_cache in named_caches: + if dir_cache: + used.update(dir_cache[self.scheme]) + used.update(file_cache[self.scheme]) removed = False for checksum in self.all(jobs, str(self.path_info)): @@ -1275,8 +1278,12 @@ def unprotect(path_info): def _get_unpacked_dir_names(self, checksums): return set() - def extract_used_local_checksums(self, named_cache): - used = set(named_cache["local"]) + def extract_used_local_checksums(self, named_caches): + used = set() + for dir_cache, file_cache in named_caches: + if dir_cache: + used.update(dir_cache["local"]) + used.update(file_cache["local"]) unpacked = self._get_unpacked_dir_names(used) return used | unpacked diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 8e66357238..ee739ab39c 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -296,7 +296,8 @@ def used_cache( used_file.update(file_cache, suffix=suffix) used_caches.append((used_dir, used_file)) - used_caches.append((None, file_caches)) + if file_caches._items or file_caches.external: + used_caches.append((None, file_caches)) return used_caches def _collect_graph(self, stages=None): diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index 49ef5d8321..e4ecb38453 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -59,7 +59,19 @@ def _fetch( drop_index=drop_index, ) except NoRemoteError: - if not used.external and used["local"]: + external = False + local = False + for dir_cache, file_cache in used: + if dir_cache: + if dir_cache.external: + external = True + if dir_cache["local"]: + local = True + if file_cache.external: + external = True + if file_cache["local"]: + local = True + if not external and local: raise except DownloadError as exc: failed += exc.amount diff --git a/dvc/repo/gc.py b/dvc/repo/gc.py index 8f1222fd84..5cd5d2b645 100644 --- a/dvc/repo/gc.py +++ b/dvc/repo/gc.py @@ -1,7 +1,6 @@ import logging from . import locked -from dvc.cache import NamedCache from dvc.exceptions import InvalidArgumentError @@ -60,9 +59,9 @@ def gc( stack.enter_context(repo.lock) stack.enter_context(repo.state) - used = NamedCache() + used = [] for repo in all_repos + [self]: - used.update( + used.extend( repo.used_cache( all_branches=all_branches, with_deps=with_deps, diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 1bf93ab7fb..2d17f27616 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -112,7 +112,7 @@ def _test_cloud(self): out = stage.outs[0] cache = out.cache_path md5 = out.checksum - info = out.get_used_cache() + info = [out.get_used_cache()] stages = self.dvc.add(self.DATA_DIR) self.assertEqual(len(stages), 1) @@ -122,7 +122,9 @@ def _test_cloud(self): cache_dir = out_dir.cache_path name_dir = str(out_dir) md5_dir = out_dir.checksum - info_dir = NamedCache.make(out_dir.scheme, md5_dir, name_dir) + info_dir = [ + (NamedCache.make(out_dir.scheme, md5_dir, name_dir), NamedCache()) + ] with self.cloud.repo.state: # Check status diff --git a/tests/unit/command/test_data_sync.py b/tests/unit/command/test_data_sync.py index 41d464dbd9..2bc083b70a 100644 --- a/tests/unit/command/test_data_sync.py +++ b/tests/unit/command/test_data_sync.py @@ -54,6 +54,7 @@ def test_pull(mocker): "--with-deps", "--force", "--recursive", + "--drop-index", ] ) assert cli_args.func == CmdDataPull @@ -73,6 +74,7 @@ def test_pull(mocker): with_deps=True, force=True, recursive=True, + drop_index=True, ) @@ -91,6 +93,7 @@ def test_push(mocker): "--all-commits", "--with-deps", "--recursive", + "--drop-index", ] ) assert cli_args.func == CmdDataPush @@ -109,4 +112,5 @@ def test_push(mocker): all_commits=True, with_deps=True, recursive=True, + drop_index=True, ) diff --git a/tests/unit/command/test_status.py b/tests/unit/command/test_status.py index 7b8ea33248..2663a6bc7c 100644 --- a/tests/unit/command/test_status.py +++ b/tests/unit/command/test_status.py @@ -17,6 +17,7 @@ def test_cloud_status(mocker): "--all-tags", "--all-commits", "--with-deps", + "--drop-index", ] ) assert cli_args.func == CmdDataStatus @@ -35,4 +36,5 @@ def test_cloud_status(mocker): all_tags=True, all_commits=True, with_deps=True, + drop_index=True, ) diff --git a/tests/unit/output/test_output.py b/tests/unit/output/test_output.py index fb9325471b..42e503ec83 100644 --- a/tests/unit/output/test_output.py +++ b/tests/unit/output/test_output.py @@ -86,5 +86,8 @@ def test_get_used_cache(exists, expected_message, mocker, caplog): ).return_value = exists with caplog.at_level(logging.WARNING, logger="dvc"): - assert isinstance(output.get_used_cache(), NamedCache) + used = output.get_used_cache() + assert isinstance(used, tuple) + assert used[0] is None + assert isinstance(used[1], NamedCache) assert first(caplog.messages) == expected_message diff --git a/tests/unit/remote/test_local.py b/tests/unit/remote/test_local.py index 04e216bde0..6ef1c389bf 100644 --- a/tests/unit/remote/test_local.py +++ b/tests/unit/remote/test_local.py @@ -26,7 +26,7 @@ def test_status_download_optimization(mocker): other_remote.url = "other_remote" other_remote.cache_exists.return_value = [] - remote.status(infos, other_remote, download=True) + remote.status([(None, infos)], other_remote, download=True) assert other_remote.cache_exists.call_count == 0 diff --git a/tests/unit/repo/test_repo.py b/tests/unit/repo/test_repo.py index 985c113ef1..447487f5b0 100644 --- a/tests/unit/repo/test_repo.py +++ b/tests/unit/repo/test_repo.py @@ -37,10 +37,10 @@ def test_used_cache(tmp_dir, dvc, path): from dvc.cache import NamedCache tmp_dir.dvc_gen({"dir": {"subdir": {"file": "file"}, "other": "other"}}) - expected = NamedCache.make( + expected_dir = NamedCache.make( "local", "70922d6bf66eb073053a82f77d58c536.dir", "dir" ) - expected.add( + expected_file = NamedCache.make( "local", "8c7dd922ad47494fc02c388e12c00eac", os.path.join("dir", "subdir", "file"), @@ -48,9 +48,16 @@ def test_used_cache(tmp_dir, dvc, path): with dvc.state: used_cache = dvc.used_cache([path]) + assert isinstance(used_cache, list) + assert len(used_cache) == 1 + assert isinstance(used_cache[0], tuple) + used_dir = used_cache[0][0] + used_file = used_cache[0][1] assert ( - used_cache._items == expected._items - and used_cache.external == expected.external + used_dir._items == expected_dir._items + and used_dir.external == expected_dir.external + and used_file._items == expected_file._items + and used_file.external == expected_file.external ) From f85020bcbfc15b6abf618cd2625fd1c07be45771 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 8 Apr 2020 12:43:38 +0900 Subject: [PATCH 06/15] fix py3.5 issues --- dvc/remote/index.py | 12 ++++++------ tests/unit/remote/test_index.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index dc1a6d7dfa..0a484c3df7 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -1,5 +1,5 @@ import logging -import pathlib +import os import pickle logger = logging.getLogger(__name__) @@ -20,8 +20,8 @@ class RemoteIndex(object): def __init__(self, repo, name=None): if name: - self.path = pathlib.Path(repo.index_dir).joinpath( - "{}{}".format(name, self.INDEX_SUFFIX) + self.path = os.path.join( + repo.index_dir, "{}{}".format(name, self.INDEX_SUFFIX) ) else: self.path = None @@ -37,7 +37,7 @@ def checksums(self): def load(self): """(Re)load this index from disk.""" - if self.path and self.path.is_file(): + if self.path and os.path.isfile(self.path): try: with open(self.path, "rb") as fobj: self._checksums = pickle.load(fobj) @@ -62,8 +62,8 @@ def save(self): def invalidate(self): """Invalidate this index (to force re-indexing later).""" self._checksums.clear() - if self.path and self.path.exists(): - self.path.unlink() + if self.path and os.path.isfile(self.path): + os.unlink(self.path) def remove(self, checksum): if checksum in self._checksums: diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 8799876ee1..3f6e9faa04 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -34,4 +34,4 @@ def test_invalidate(dvc): index.save() index.invalidate() assert not index.checksums - assert not index.path.exists() + assert not os.path.exists(index.path) From 50c9743ee820e1d99f6a23fa3ea1dabb4cc28c30 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 8 Apr 2020 16:02:36 +0900 Subject: [PATCH 07/15] push: only upload .dir file after full file contents has been uploaded --- dvc/remote/local.py | 140 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 113 insertions(+), 27 deletions(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 25eb479d49..943f4a740e 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -2,10 +2,10 @@ import logging import os import stat -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed, ThreadPoolExecutor from functools import partial -from funcy import cat +from funcy import cat, concat from shortuuid import uuid @@ -260,20 +260,48 @@ def status( download=False, drop_index=False, ): + # Return flattened dict containing all status info + dir_status, file_status, _ = self._status( + named_caches, + remote, + jobs=jobs, + show_checksums=show_checksums, + download=download, + drop_index=drop_index, + ) + return dict(dir_status, **file_status) + + def _status( + self, + named_caches, + remote, + jobs=None, + show_checksums=False, + download=False, + drop_index=False, + ): + """Return a tuple of (dir_status_info, file_status_info, dir_mapping). + + dir_status_info contains status for .dir files, file_status_info + contains status for all other files, and dir_mapping is a dict of + {dir_path_info: set(file_path_info...)} which can be used to map + a .dir file to its file contents. + """ logger.debug( "Preparing to collect status from {}".format(remote.path_info) ) - cache = NamedCache() + merged_dir_cache = NamedCache() + merged_file_cache = NamedCache() dir_md5s = {} md5s = set() for dir_cache, file_cache in named_caches: - cache.update(file_cache) + merged_file_cache.update(file_cache) md5s.update(file_cache[self.scheme]) if dir_cache is not None: - cache.update(dir_cache) - checksums = frozenset(dir_cache[self.scheme].keys()) - md5s.update(checksums) - dir_md5s[checksums] = frozenset(file_cache[self.scheme]) + merged_dir_cache.update(dir_cache) + for checksum in dir_cache[self.scheme]: + dir_md5s[checksum] = frozenset(file_cache[self.scheme]) + md5s.update(dir_md5s.keys()) logger.debug("Collecting information from local cache...") local_exists = frozenset( @@ -317,15 +345,27 @@ def status( ) ) - ret = { - checksum: {"name": checksum if show_checksums else " ".join(names)} - for checksum, names in cache[self.scheme].items() - } - self._fill_statuses(ret, local_exists, remote_exists) - - self._log_missing_caches(ret) - - return ret + def cache_to_dict(cache): + return { + checksum: { + "name": checksum if show_checksums else " ".join(names) + } + for checksum, names in cache[self.scheme].items() + } + + dir_status = cache_to_dict(merged_dir_cache) + file_status = cache_to_dict(merged_file_cache) + self._fill_statuses(dir_status, local_exists, remote_exists) + self._fill_statuses(file_status, local_exists, remote_exists) + + self._log_missing_caches(dict(dir_status, **file_status)) + + dir_paths = {} + for md5, file_md5s in dir_md5s.items(): + dir_paths[remote.checksum_to_path_info(md5)] = frozenset( + map(remote.checksum_to_path_info, file_md5s) + ) + return dir_status, file_status, dir_paths @staticmethod def _fill_statuses(checksum_info_dir, local_exists, remote_exists): @@ -388,7 +428,7 @@ def _process( if jobs is None: jobs = remote.JOBS - status_info = self.status( + dir_status, file_status, dir_paths = self._status( named_caches, remote, jobs=jobs, @@ -397,16 +437,45 @@ def _process( drop_index=drop_index, ) - plans = self._get_plans(download, remote, status_info, status) + dir_plans = self._get_plans(download, remote, dir_status, status) + file_plans = self._get_plans(download, remote, file_status, status) - if len(plans[0]) == 0: + if len(dir_plans[0]) + len(file_plans[0]) == 0: return 0 - if jobs > 1: - with ThreadPoolExecutor(max_workers=jobs) as executor: - fails = sum(executor.map(func, *plans)) - else: - fails = sum(map(func, *plans)) + with ThreadPoolExecutor(max_workers=jobs) as executor: + if download: + fails = sum(executor.map(func, *dir_plans)) + fails += sum(executor.map(func, *file_plans)) + else: + # for uploads, push files first, and any .dir files last + + file_futures = {} + for from_info, to_info, name in zip(*file_plans): + file_futures[to_info] = executor.submit( + func, from_info, to_info, name + ) + dir_futures = {} + for from_info, to_info, name in zip(*dir_plans): + wait_futures = { + future + for file_path, future in file_futures.items() + if file_path in dir_paths[to_info] + } + dir_futures[to_info] = executor.submit( + self._dir_upload, + func, + wait_futures, + from_info, + to_info, + name, + ) + fails = sum( + future.result() + for future in concat( + file_futures.values(), dir_futures.values() + ) + ) if fails: if download: @@ -414,7 +483,9 @@ def _process( raise DownloadError(fails) raise UploadError(fails) elif not download: - pushed_checksums = list(map(self.path_to_checksum, plans[0])) + pushed_checksums = list( + map(self.path_to_checksum, dir_plans[0] + file_plans[0]) + ) logger.debug( "Adding {} pushed checksums to index".format( len(pushed_checksums) @@ -423,7 +494,22 @@ def _process( remote.index.update(pushed_checksums) remote.index.save() - return len(plans[0]) + return len(dir_plans[0]) + len(file_plans[0]) + + def _dir_upload(self, func, futures, from_info, to_info, name): + for future in as_completed(futures): + if future.result(): + # do not upload this .dir file if any file in this + # directory failed to upload + logger.debug( + "failed to upload full contents of '{}', " + "aborting .dir file upload".format(name) + ) + logger.error( + "failed to upload '{}' to '{}'".format(from_info, to_info) + ) + return 1 + return func(from_info, to_info, name) def push( self, From 20e85105e3632936a3fab6c770a94909c4580761 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 8 Apr 2020 16:14:49 +0900 Subject: [PATCH 08/15] gc: always remove .dir checksums first - invalidate local index if any object was removed from the remote --- dvc/remote/base.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 46b339b41e..f448b25fff 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -741,7 +741,6 @@ def all(self, jobs=None, name=None): ) def gc(self, named_caches, jobs=None): - logger.debug("named_caches: {} jobs: {}".format(named_caches, jobs)) used = self.extract_used_local_checksums(named_caches) if self.scheme != "": @@ -751,7 +750,12 @@ def gc(self, named_caches, jobs=None): used.update(file_cache[self.scheme]) removed = False - for checksum in self.all(jobs, str(self.path_info)): + # checksums must be sorted to ensure we always remove .dir files first + for checksum in sorted( + self.all(jobs, str(self.path_info)), + key=self.is_dir_checksum, + reverse=True, + ): if checksum in used: continue path_info = self.checksum_to_path_info(checksum) @@ -761,7 +765,7 @@ def gc(self, named_caches, jobs=None): self.remove(path_info) removed = True if removed: - self.index.save() + self.index.invalidate() return removed def is_protected(self, path_info): From a08f6b7ba65e5b4087b0b5c6a62c78c37240dd36 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 9 Apr 2020 14:14:59 +0900 Subject: [PATCH 09/15] fix review issues --- dvc/command/data_sync.py | 2 +- dvc/remote/index.py | 47 +++++++++++++++++++++++++++++++++------- dvc/remote/local.py | 7 ++---- 3 files changed, 42 insertions(+), 14 deletions(-) diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index cfd8e4522b..ca6430facc 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -164,7 +164,7 @@ def add_parser(subparsers, _parent_parser): "--drop-index", action="store_true", default=False, - help="Drop local index for the specified remote cache.", + help="Drop local index for the specified remote.", ) pull_parser.set_defaults(func=CmdDataPull) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 0a484c3df7..53936e9ee1 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -1,6 +1,7 @@ import logging import os import pickle +import threading logger = logging.getLogger(__name__) @@ -25,7 +26,9 @@ def __init__(self, repo, name=None): ) else: self.path = None + self.lock = threading.Lock() self._checksums = set() + self.modified = False self.load() def __iter__(self): @@ -38,47 +41,75 @@ def checksums(self): def load(self): """(Re)load this index from disk.""" if self.path and os.path.isfile(self.path): + self.lock.acquire() try: with open(self.path, "rb") as fobj: self._checksums = pickle.load(fobj) - except PermissionError: + self.modified = False + except IOError as exc: logger.error( - "Insufficient permissions to read index file " - "'{}'".format(self.path) + "Failed to load remote index file '{}'. " + "Remote will be re-indexed: {}".format(self.path, exc) ) + finally: + self.lock.release() def save(self): """Save this index to disk.""" - if self.path: + if self.path and self.modified: + self.lock.acquire() try: with open(self.path, "wb") as fobj: pickle.dump(self._checksums, fobj) - except PermissionError: + self.modified = False + except IOError as exc: logger.error( - "Insufficient permissions to write index file " - "'{}'".format(self.path) + "Failed to save remote index file '{}': {}".format( + self.path, exc + ) ) + finally: + self.lock.release() def invalidate(self): """Invalidate this index (to force re-indexing later).""" + self.lock.acquire() self._checksums.clear() + self.modified = True if self.path and os.path.isfile(self.path): - os.unlink(self.path) + try: + os.unlink(self.path) + except IOError as exc: + logger.error( + "Failed to remove remote index file '{}': {}".format( + self.path, exc + ) + ) + self.lock.release() def remove(self, checksum): if checksum in self._checksums: + self.lock.acquire() self._checksums.remove(checksum) + self.modified = True + self.lock.release() def replace(self, checksums): """Replace the full contents of this index with ``checksums``. Changes to the index will not be written to disk. """ + self.lock.acquire() self._checksums = set(checksums) + self.modified = True + self.lock.release() def update(self, *checksums): """Update this index, adding elements from ``checksums``. Changes to the index will not be written to disk. """ + self.lock.acquire() self._checksums.update(*checksums) + self.modified = True + self.lock.release() diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 943f4a740e..acdd099ef8 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -501,12 +501,9 @@ def _dir_upload(self, func, futures, from_info, to_info, name): if future.result(): # do not upload this .dir file if any file in this # directory failed to upload - logger.debug( - "failed to upload full contents of '{}', " - "aborting .dir file upload".format(name) - ) logger.error( - "failed to upload '{}' to '{}'".format(from_info, to_info) + "one or more files failed while uploading " + "'{}' to '{}'".format(from_info, to_info) ) return 1 return func(from_info, to_info, name) From 8e4eed53a8cdac8d7561f542943e9ab4124bc48c Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 9 Apr 2020 14:17:38 +0900 Subject: [PATCH 10/15] only save index once per command - only write changes if index was actually modified since last save/load --- dvc/remote/base.py | 2 -- dvc/remote/local.py | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index f448b25fff..c3b76951f5 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -897,7 +897,6 @@ def cache_exists(self, checksums, jobs=None, name=None): remote_checksums = self._cache_object_exists(checksums, jobs, name) if remote_checksums: self.index.update(remote_checksums) - self.index.save() return list(indexed_checksums) + remote_checksums # Max remote size allowed for us to use traverse method @@ -936,7 +935,6 @@ def cache_exists(self, checksums, jobs=None, name=None): ) ) self.index.replace(remote_checksums) - self.index.save() return list(indexed_checksums) + list( checksums & set(remote_checksums) ) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index acdd099ef8..28266cb97b 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -269,6 +269,7 @@ def status( download=download, drop_index=drop_index, ) + remote.index.save() return dict(dir_status, **file_status) def _status( From 38a80e362a8fe8ad332b64c35b221f2a786d4350 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 9 Apr 2020 19:34:42 +0900 Subject: [PATCH 11/15] write checksums to a flat file instead of using pickle - format contains simple header w/checksum counts - checksums are packed as 4-bytes and then compressed w/gzip --- dvc/remote/base.py | 4 +- dvc/remote/index.py | 166 +++++++++++++++++++++++++++++--- tests/unit/remote/test_index.py | 45 +++++---- 3 files changed, 184 insertions(+), 31 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c3b76951f5..121d06928e 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -118,7 +118,9 @@ def __init__(self, repo, config): index_name = hashlib.sha256(url.encode("utf-8")).hexdigest() else: index_name = None - self.index = RemoteIndex(self.repo, index_name) + self.index = RemoteIndex( + self.repo, index_name, self.CHECKSUM_DIR_SUFFIX + ) @classmethod def get_missing_deps(cls): diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 53936e9ee1..f692e40428 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -1,11 +1,142 @@ import logging import os -import pickle +import struct import threading +import zlib +from binascii import unhexlify + +from funcy import chunks, concat, split, split_at + +from dvc.exceptions import DvcException logger = logging.getLogger(__name__) +DEFAULT_PROTOCOL = 1 +SUPPORTED_PROTOCOLS = [1] + +# Index format (v1) is the following: +# +# (all integer types are little-endian) +# +# Header +# ------ +# protocol_version (32-bit uint): index file protocol version +# dir_checksum_count (64-bit uint): number of .dir checksums in data section +# file_checksum_count (64-bit uint): number of file checksums in data section +# compressed_len (64-bit uint): length of data section +# crc - 32-bit CRC32 checksum of uncompressed data +# +# Data (zlib compressed) +# ---------------------- +# array of 128-bit MD5 .dir checksums +# array of 128-bit MD5 file checksums +_header_v1 = struct.Struct(" Date: Fri, 10 Apr 2020 13:13:57 +0900 Subject: [PATCH 12/15] no need to try to compress md5 hashes --- dvc/remote/index.py | 76 ++++++++++++--------------------- tests/unit/remote/test_index.py | 2 +- 2 files changed, 28 insertions(+), 50 deletions(-) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index f692e40428..93932e1ea6 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -5,7 +5,7 @@ import zlib from binascii import unhexlify -from funcy import chunks, concat, split, split_at +from funcy import chunks, concat, split from dvc.exceptions import DvcException @@ -24,15 +24,13 @@ # protocol_version (32-bit uint): index file protocol version # dir_checksum_count (64-bit uint): number of .dir checksums in data section # file_checksum_count (64-bit uint): number of file checksums in data section -# compressed_len (64-bit uint): length of data section # crc - 32-bit CRC32 checksum of uncompressed data # -# Data (zlib compressed) +# Data (total length = 16 * dir_checksum_count * file_checksum_count bytes) # ---------------------- # array of 128-bit MD5 .dir checksums # array of 128-bit MD5 file checksums -_header_v1 = struct.Struct(" 0: + data = fobj.read(min(bytes_remaining, 16384)) + bytes_remaining -= len(data) yield data - data = extra + decompress.flush() - crc = zlib.crc32(data, crc) - if crc != file_crc: - raise DvcException("Remote index file failed CRC check") - return data - for data in read_chunks(): + for data in read_chunks(dir_count): + crc = zlib.crc32(data, crc) checksums = chunks(32, data.hex()) - if len(dir_checksums) < dir_count: - dirs, files = split_at(dir_count - len(dir_checksums), checksums) - dir_checksums.update(checksum + dir_suffix for checksum in dirs) - file_checksums.update(files) - else: - file_checksums.update(checksums) + dir_checksums.update(checksum + dir_suffix for checksum in checksums) + for data in read_chunks(file_count): + crc = zlib.crc32(data, crc) + checksums = chunks(32, data.hex()) + file_checksums.update(checksums) + + if crc != file_crc: + raise DvcException("Remote index file failed CRC check") return dir_checksums, file_checksums diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 07e3efbbfd..333d9e7f07 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -7,7 +7,7 @@ def test_protocol_v1_roundtrip(tmp_dir): tmpfile = os.path.join(tmp_dir, "foo.idx") expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) - expected_file = frozenset(map(_to_checksum, range(2000))) + expected_file = frozenset(map(_to_checksum, range(10000))) with open(tmpfile, "wb") as fobj: dump(expected_dir, expected_file, fobj) with open(tmpfile, "rb") as fobj: From a6f587769c42d71b062bc62a81d547127b5af2dc Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 10 Apr 2020 14:13:43 +0900 Subject: [PATCH 13/15] add helper functions update_all/replace_all - there are some places in dvc where we already have separated dir/file checksums and some places that we do not, use update()/replace() where we already have them separated, and update_all()/replace_all() when we have to do the filtering ourself in RemoteIndex --- dvc/remote/base.py | 5 ++-- dvc/remote/index.py | 36 +++++++++++++--------- dvc/remote/local.py | 11 ++++--- tests/unit/remote/test_index.py | 53 +++++++++++++++++++++++++++++++-- 4 files changed, 82 insertions(+), 23 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 121d06928e..6b1fd62764 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -763,7 +763,6 @@ def gc(self, named_caches, jobs=None): path_info = self.checksum_to_path_info(checksum) if self.is_dir_checksum(checksum): self._remove_unpacked_dir(checksum) - self.index.remove(checksum) self.remove(path_info) removed = True if removed: @@ -898,7 +897,7 @@ def cache_exists(self, checksums, jobs=None, name=None): if len(checksums) == 1 or not self.CAN_TRAVERSE: remote_checksums = self._cache_object_exists(checksums, jobs, name) if remote_checksums: - self.index.update(remote_checksums) + self.index.update_all(remote_checksums) return list(indexed_checksums) + remote_checksums # Max remote size allowed for us to use traverse method @@ -936,7 +935,7 @@ def cache_exists(self, checksums, jobs=None, name=None): remote_size, remote_checksums, jobs, name ) ) - self.index.replace(remote_checksums) + self.index.replace_all(remote_checksums) return list(indexed_checksums) + list( checksums & set(remote_checksums) ) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 93932e1ea6..5a4a334eef 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -124,6 +124,7 @@ class RemoteIndex(object): loaded from and saved to ``.dvc/tmp/index/{name}.idx``. If name is not provided (i.e. for local remotes), this index will be kept in memory but not saved to disk. + dir_suffix: suffix used for naming directory checksums """ INDEX_SUFFIX = ".idx" @@ -204,32 +205,39 @@ def invalidate(self): ) self.lock.release() - def remove(self, checksum): - if checksum in self._checksums: - self.lock.acquire() - self._checksums.remove(checksum) - self.modified = True - self.lock.release() - - def replace(self, checksums): - """Replace the full contents of this index with ``checksums``. + def replace(self, dir_checksums, file_checksums): + """Replace the contents of this index with the specified checksums. Changes to the index will not be written to disk. """ self.lock.acquire() - self._dir_checksums = set() - self._file_checksums = set() - self.update(checksums) + self._dir_checksums = set(dir_checksums) + self._file_checksums = set(file_checksums) self.lock.release() - def update(self, *checksums): - """Update this index, adding elements from ``checksums``. + def replace_all(self, *checksums): + """Replace the contents of this index with the specified checksums. Changes to the index will not be written to disk. """ dir_checksums, file_checksums = split(self.is_dir_checksum, *checksums) + self.replace(dir_checksums, file_checksums) + + def update(self, dir_checksums, file_checksums): + """Update this index, adding the specified checksums. + + Changes to the index will not be written to disk. + """ self.lock.acquire() self._dir_checksums.update(dir_checksums) self._file_checksums.update(file_checksums) self.modified = True self.lock.release() + + def update_all(self, *checksums): + """Update this index, adding the specified checksums. + + Changes to the index will not be written to disk. + """ + dir_checksums, file_checksums = split(self.is_dir_checksum, *checksums) + self.update(dir_checksums, file_checksums) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 28266cb97b..5882bcd0b9 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -484,15 +484,18 @@ def _process( raise DownloadError(fails) raise UploadError(fails) elif not download: - pushed_checksums = list( - map(self.path_to_checksum, dir_plans[0] + file_plans[0]) + pushed_dir_checksums = list( + map(self.path_to_checksum, dir_plans[0]) + ) + pushed_file_checksums = list( + map(self.path_to_checksum, file_plans[0]) ) logger.debug( "Adding {} pushed checksums to index".format( - len(pushed_checksums) + len(pushed_dir_checksums) + len(pushed_file_checksums) ) ) - remote.index.update(pushed_checksums) + remote.index.update(pushed_dir_checksums, pushed_file_checksums) remote.index.save() return len(dir_plans[0]) + len(file_plans[0]) diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 333d9e7f07..bbb61b4f1f 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -29,7 +29,7 @@ def test_roundtrip(dvc): expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) index = RemoteIndex(dvc, "foo") - index.replace(expected_dir | expected_file) + index.replace(expected_dir, expected_file) index.save() index.load() assert index._dir_checksums == expected_dir @@ -39,8 +39,57 @@ def test_roundtrip(dvc): def test_invalidate(dvc): index = RemoteIndex(dvc, "foo") - index.replace({_to_checksum(n) for n in (1, 2, 3)}) + index.replace( + ["0123456789abcdef0123456789abcdef.dir"], + ["fedcba9876543210fedcba9876543210"], + ) index.save() index.invalidate() assert not index.checksums assert not os.path.exists(index.path) + + +def test_replace(dvc): + index = RemoteIndex(dvc, "foo") + index._dir_checksums = set() + index._file_checksums = set() + expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(expected_dir, expected_file) + assert index._dir_checksums == expected_dir + assert index._file_checksums == expected_file + + +def test_replace_all(dvc): + index = RemoteIndex(dvc, "foo") + index._dir_checksums = set() + index._file_checksums = set() + expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace_all(expected_dir | expected_file) + assert index._dir_checksums == expected_dir + assert index._file_checksums == expected_file + + +def test_update(dvc): + index = RemoteIndex(dvc, "foo") + initial_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + initial_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(initial_dir, initial_file) + expected_dir = frozenset(["1123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543211"]) + index.update(expected_dir, expected_file) + assert index._dir_checksums == initial_dir | expected_dir + assert index._file_checksums == initial_file | expected_file + + +def test_update_all(dvc): + index = RemoteIndex(dvc, "foo") + initial_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + initial_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(initial_dir, initial_file) + expected_dir = frozenset(["1123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543211"]) + index.update_all(expected_dir | expected_file) + assert index._dir_checksums == initial_dir | expected_dir + assert index._file_checksums == initial_file | expected_file From 199cdba424a82e3924d0d0b74d83d46b0bbadc13 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 10 Apr 2020 15:06:57 +0900 Subject: [PATCH 14/15] review docstring fixes --- dvc/command/data_sync.py | 4 ++-- dvc/data_cloud.py | 3 +++ dvc/remote/index.py | 10 +++++----- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index ca6430facc..ff9fbd7dd7 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -219,7 +219,7 @@ def add_parser(subparsers, _parent_parser): "--drop-index", action="store_true", default=False, - help="Drop local index for the specified remote cache.", + help="Drop local index for the remote.", ) push_parser.set_defaults(func=CmdDataPush) @@ -342,6 +342,6 @@ def add_parser(subparsers, _parent_parser): "--drop-index", action="store_true", default=False, - help="Drop local index for the specified remote cache.", + help="Drop local index for the remote.", ) status_parser.set_defaults(func=CmdDataStatus) diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index 11b7a04e42..700886e86e 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -66,6 +66,7 @@ def push( By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. + drop_index (bool): clear local index for the remote """ return self.repo.cache.local.push( caches, @@ -93,6 +94,7 @@ def pull( By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. + drop_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "pull") downloaded_items_num = self.repo.cache.local.pull( @@ -143,6 +145,7 @@ def status( is used. show_checksums (bool): show checksums instead of file names in information messages. + drop_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "status") return self.repo.cache.local.status( diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 5a4a334eef..31f076bd26 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -38,13 +38,13 @@ def _verify_protocol(protocol=None): protocol = DEFAULT_PROTOCOL if protocol not in SUPPORTED_PROTOCOLS: raise DvcException( - "unsupported remote index protocol version: {}".format(protocol) + "unsupported remote index protocol version: '{}'".format(protocol) ) return protocol def dump(dir_checksums, file_checksums, fobj, protocol=None): - """Write specified checksums to the open file object ``fobj``.""" + """Write specified checksums to the specified open file object""" protocol = _verify_protocol(protocol) _dump_v1(dir_checksums, file_checksums, fobj) @@ -70,7 +70,7 @@ def _dump_v1(dir_checksums, file_checksums, fobj): def load(fobj, protocol=None, dir_suffix=".dir"): - """Read index checksums from the open file object ``fobj``. + """Read index checksums from the specified open file object. Returns a 2-tuple of (dir_checksums, file_checksums). """ @@ -87,7 +87,7 @@ def _load_v1(fobj, dir_suffix=""): fobj.read(_header_v1.size) ) except struct.error as exc: - raise DvcException("Invalid v1 remote index file: {}".format(exc)) + raise DvcException("Invalid v1 remote index file: '{}'".format(exc)) dir_checksums = set() file_checksums = set() crc = 0 @@ -166,7 +166,7 @@ def load(self): except IOError as exc: logger.error( "Failed to load remote index file '{}'. " - "Remote will be re-indexed: {}".format(self.path, exc) + "Remote will be re-indexed: '{}'".format(self.path, exc) ) finally: self.lock.release() From 56e511c0b0bb5b206fa94928a17b5d5a51bea56c Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 10 Apr 2020 15:44:10 +0900 Subject: [PATCH 15/15] treat gc -c as full re-indexing --- dvc/remote/base.py | 45 ++++++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 6b1fd62764..ff5f67b823 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -11,6 +11,8 @@ from multiprocessing import cpu_count from operator import itemgetter +from funcy import split + from shortuuid import uuid import dvc.prompt as prompt @@ -751,23 +753,40 @@ def gc(self, named_caches, jobs=None): used.update(dir_cache[self.scheme]) used.update(file_cache[self.scheme]) - removed = False - # checksums must be sorted to ensure we always remove .dir files first - for checksum in sorted( - self.all(jobs, str(self.path_info)), - key=self.is_dir_checksum, - reverse=True, - ): + # checksums must be separated into .dir and file checksums + # to ensure we always remove .dir files first + dir_checksums, file_checksums = split( + self.is_dir_checksum, self.all(jobs, str(self.path_info)) + ) + dir_checksums = frozenset(dir_checksums) + file_checksums = frozenset(file_checksums) + + def remove_checksum(checksum, is_dir=False): if checksum in used: - continue + return False path_info = self.checksum_to_path_info(checksum) - if self.is_dir_checksum(checksum): + if is_dir: self._remove_unpacked_dir(checksum) self.remove(path_info) - removed = True - if removed: - self.index.invalidate() - return removed + return True + + dir_removed = { + checksum + for checksum in dir_checksums + if remove_checksum(checksum, True) + } + file_removed = { + checksum + for checksum in file_checksums + if remove_checksum(checksum) + } + + # save full remote index + self.index.replace( + dir_checksums - dir_removed, file_checksums - file_removed, + ) + self.index.save() + return dir_removed or file_removed def is_protected(self, path_info): return False