diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index 3e6a3b8ed2..ff9fbd7dd7 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -30,6 +30,7 @@ def run(self): with_deps=self.args.with_deps, force=self.args.force, recursive=self.args.recursive, + drop_index=self.args.drop_index, ) log_summary(stats) except (CheckoutError, DvcException) as exc: @@ -53,6 +54,7 @@ def run(self): all_commits=self.args.all_commits, with_deps=self.args.with_deps, recursive=self.args.recursive, + drop_index=self.args.drop_index, ) except DvcException: logger.exception("failed to push data to the cloud") @@ -158,6 +160,12 @@ def add_parser(subparsers, _parent_parser): default=False, help="Pull cache for subdirectories of the specified directory.", ) + pull_parser.add_argument( + "--drop-index", + action="store_true", + default=False, + help="Drop local index for the specified remote.", + ) pull_parser.set_defaults(func=CmdDataPull) # Push @@ -207,6 +215,12 @@ def add_parser(subparsers, _parent_parser): default=False, help="Push cache for subdirectories of specified directory.", ) + push_parser.add_argument( + "--drop-index", + action="store_true", + default=False, + help="Drop local index for the remote.", + ) push_parser.set_defaults(func=CmdDataPush) # Fetch @@ -324,4 +338,10 @@ def add_parser(subparsers, _parent_parser): default=False, help="Show status for all dependencies of the specified target.", ) + status_parser.add_argument( + "--drop-index", + action="store_true", + default=False, + help="Drop local index for the remote.", + ) status_parser.set_defaults(func=CmdDataStatus) diff --git a/dvc/command/status.py b/dvc/command/status.py index a441f71369..4585fd2ef4 100644 --- a/dvc/command/status.py +++ b/dvc/command/status.py @@ -49,6 +49,7 @@ def run(self): all_tags=self.args.all_tags, all_commits=self.args.all_commits, with_deps=self.args.with_deps, + drop_index=self.args.drop_index, ) if st: if self.args.quiet: diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index ea375b65ce..700886e86e 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -48,67 +48,110 @@ def get_remote(self, remote=None, command=""): def _init_remote(self, remote): return Remote(self.repo, name=remote) - def push(self, cache, jobs=None, remote=None, show_checksums=False): + def push( + self, + caches, + jobs=None, + remote=None, + show_checksums=False, + drop_index=False, + ): """Push data items in a cloud-agnostic way. Args: - cache (NamedCache): named checksums to push to the cloud. + caches (list): list of (dir_cache, file_cache) tuples containing + named checksums to push to the cloud. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to push to. By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. + drop_index (bool): clear local index for the remote """ return self.repo.cache.local.push( - cache, + caches, jobs=jobs, remote=self.get_remote(remote, "push"), show_checksums=show_checksums, + drop_index=drop_index, ) - def pull(self, cache, jobs=None, remote=None, show_checksums=False): + def pull( + self, + caches, + jobs=None, + remote=None, + show_checksums=False, + drop_index=False, + ): """Pull data items in a cloud-agnostic way. Args: - cache (NamedCache): named checksums to pull from the cloud. + caches (list): list of (dir_cache, file_cache) tuples containing + named checksums to pull from the cloud. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to pull from. By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. + drop_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "pull") downloaded_items_num = self.repo.cache.local.pull( - cache, jobs=jobs, remote=remote, show_checksums=show_checksums + caches, + jobs=jobs, + remote=remote, + show_checksums=show_checksums, + drop_index=drop_index, ) if not remote.verify: - self._save_pulled_checksums(cache) + self._save_pulled_checksums(caches) return downloaded_items_num def _save_pulled_checksums(self, cache): - for checksum in cache["local"].keys(): - cache_file = self.repo.cache.local.checksum_to_path_info(checksum) - if self.repo.cache.local.exists(cache_file): - # We can safely save here, as existing corrupted files will be - # removed upon status, while files corrupted during download - # will not be moved from tmp_file (see `RemoteBASE.download()`) - self.repo.state.save(cache_file, checksum) - - def status(self, cache, jobs=None, remote=None, show_checksums=False): + for dir_cache, file_cache in cache: + checksums = set(file_cache["local"].keys()) + if dir_cache is not None: + checksums.update(dir_cache["local"].keys()) + for checksum in checksums: + cache_file = self.repo.cache.local.checksum_to_path_info( + checksum + ) + if self.repo.cache.local.exists(cache_file): + # We can safely save here, as existing corrupted files will + # be removed upon status, while files corrupted during + # download will not be moved from tmp_file + # (see `RemoteBASE.download()`) + self.repo.state.save(cache_file, checksum) + + def status( + self, + caches, + jobs=None, + remote=None, + show_checksums=False, + drop_index=False, + ): """Check status of data items in a cloud-agnostic way. Args: - cache (NamedCache): named checksums to check status for. + caches (list): list of (dir_cache, file_cache) tuples containg + named checksums to check status for. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to compare cache to. By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. + drop_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "status") return self.repo.cache.local.status( - cache, jobs=jobs, remote=remote, show_checksums=show_checksums + caches, + jobs=jobs, + remote=remote, + show_checksums=show_checksums, + drop_index=drop_index, ) diff --git a/dvc/external_repo.py b/dvc/external_repo.py index a598eb6692..142fcfc9d2 100644 --- a/dvc/external_repo.py +++ b/dvc/external_repo.py @@ -104,7 +104,7 @@ def _pull_cached(self, out, path_info, dest): # Only pull unless all needed cache is present if out.changed_cache(filter_info=src): - self.cloud.pull(out.get_used_cache(filter_info=src)) + self.cloud.pull([out.get_used_cache(filter_info=src)]) try: out.checkout(filter_info=src) diff --git a/dvc/output/base.py b/dvc/output/base.py index f6f7856de5..218f3aa4b2 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -363,8 +363,9 @@ def _collect_used_dir_cache( if self.cache.changed_cache_file(self.checksum): try: + cache = NamedCache.make("local", self.checksum, str(self)) self.repo.cloud.pull( - NamedCache.make("local", self.checksum, str(self)), + [(None, cache)], jobs=jobs, remote=remote, show_checksums=False, @@ -401,16 +402,22 @@ def get_used_cache(self, **kwargs): In case that the given output is a directory, it will also include the `info` of its files. + + Returns: + 2-tuple of NamedCache objects in the form of + (directory `info`, file `info`). + If the given output is not a directory, the first tuple entry will + be None. """ if not self.use_cache: - return NamedCache() + return None, NamedCache() if self.stage.is_repo_import: cache = NamedCache() (dep,) = self.stage.deps cache.external[dep.repo_pair].add(dep.def_path) - return cache + return None, cache if not self.checksum: msg = ( @@ -429,16 +436,14 @@ def get_used_cache(self, **kwargs): ) ) logger.warning(msg) - return NamedCache() + return None, NamedCache() ret = NamedCache.make(self.scheme, self.checksum, str(self)) if not self.is_dir_checksum: - return ret + return None, ret - ret.update(self._collect_used_dir_cache(**kwargs)) - - return ret + return ret, self._collect_used_dir_cache(**kwargs) @classmethod def _validate_output_path(cls, path): diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c349a7a12b..ff5f67b823 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -1,4 +1,5 @@ import errno +import hashlib import itertools import json import logging @@ -10,6 +11,8 @@ from multiprocessing import cpu_count from operator import itemgetter +from funcy import split + from shortuuid import uuid import dvc.prompt as prompt @@ -23,6 +26,7 @@ from dvc.ignore import DvcIgnore from dvc.path_info import PathInfo, URLInfo from dvc.progress import Tqdm +from dvc.remote.index import RemoteIndex from dvc.remote.slow_link_detection import slow_link_guard from dvc.state import StateNoop from dvc.utils import tmp_fname @@ -111,6 +115,15 @@ def __init__(self, repo, config): self.cache_types = config.get("type") or copy(self.DEFAULT_CACHE_TYPES) self.cache_type_confirmed = False + url = config.get("url") + if url: + index_name = hashlib.sha256(url.encode("utf-8")).hexdigest() + else: + index_name = None + self.index = RemoteIndex( + self.repo, index_name, self.CHECKSUM_DIR_SUFFIX + ) + @classmethod def get_missing_deps(cls): import importlib @@ -731,23 +744,49 @@ def all(self, jobs=None, name=None): remote_size, remote_checksums, jobs, name ) - def gc(self, named_cache, jobs=None): - logger.debug("named_cache: {} jobs: {}".format(named_cache, jobs)) - used = self.extract_used_local_checksums(named_cache) + def gc(self, named_caches, jobs=None): + used = self.extract_used_local_checksums(named_caches) if self.scheme != "": - used.update(named_cache[self.scheme]) + for dir_cache, file_cache in named_caches: + if dir_cache: + used.update(dir_cache[self.scheme]) + used.update(file_cache[self.scheme]) + + # checksums must be separated into .dir and file checksums + # to ensure we always remove .dir files first + dir_checksums, file_checksums = split( + self.is_dir_checksum, self.all(jobs, str(self.path_info)) + ) + dir_checksums = frozenset(dir_checksums) + file_checksums = frozenset(file_checksums) - removed = False - for checksum in self.all(jobs, str(self.path_info)): + def remove_checksum(checksum, is_dir=False): if checksum in used: - continue + return False path_info = self.checksum_to_path_info(checksum) - if self.is_dir_checksum(checksum): + if is_dir: self._remove_unpacked_dir(checksum) self.remove(path_info) - removed = True - return removed + return True + + dir_removed = { + checksum + for checksum in dir_checksums + if remove_checksum(checksum, True) + } + file_removed = { + checksum + for checksum in file_checksums + if remove_checksum(checksum) + } + + # save full remote index + self.index.replace( + dir_checksums - dir_removed, file_checksums - file_removed, + ) + self.index.save() + return dir_removed or file_removed def is_protected(self, path_info): return False @@ -865,10 +904,20 @@ def cache_exists(self, checksums, jobs=None, name=None): # cache_exists() (see ssh, local) assert self.TRAVERSE_PREFIX_LEN >= 2 - if len(checksums) == 1 or not self.CAN_TRAVERSE: - return self._cache_object_exists(checksums, jobs, name) + checksums = set(checksums) + indexed_checksums = checksums & self.index.checksums + checksums -= indexed_checksums + logger.debug( + "Matched {} indexed checksums".format(len(indexed_checksums)) + ) + if not checksums: + return indexed_checksums - checksums = frozenset(checksums) + if len(checksums) == 1 or not self.CAN_TRAVERSE: + remote_checksums = self._cache_object_exists(checksums, jobs, name) + if remote_checksums: + self.index.update_all(remote_checksums) + return list(indexed_checksums) + remote_checksums # Max remote size allowed for us to use traverse method remote_size, remote_checksums = self._estimate_cache_size( @@ -900,10 +949,15 @@ def cache_exists(self, checksums, jobs=None, name=None): logger.debug( "Querying {} checksums via traverse".format(len(checksums)) ) - remote_checksums = self._cache_checksums_traverse( - remote_size, remote_checksums, jobs, name + remote_checksums = set( + self._cache_checksums_traverse( + remote_size, remote_checksums, jobs, name + ) + ) + self.index.replace_all(remote_checksums) + return list(indexed_checksums) + list( + checksums & set(remote_checksums) ) - return list(checksums & set(remote_checksums)) def _checksums_with_limit( self, limit, prefix=None, progress_callback=None @@ -1246,8 +1300,12 @@ def unprotect(path_info): def _get_unpacked_dir_names(self, checksums): return set() - def extract_used_local_checksums(self, named_cache): - used = set(named_cache["local"]) + def extract_used_local_checksums(self, named_caches): + used = set() + for dir_cache, file_cache in named_caches: + if dir_cache: + used.update(dir_cache["local"]) + used.update(file_cache["local"]) unpacked = self._get_unpacked_dir_names(used) return used | unpacked diff --git a/dvc/remote/index.py b/dvc/remote/index.py new file mode 100644 index 0000000000..31f076bd26 --- /dev/null +++ b/dvc/remote/index.py @@ -0,0 +1,243 @@ +import logging +import os +import struct +import threading +import zlib +from binascii import unhexlify + +from funcy import chunks, concat, split + +from dvc.exceptions import DvcException + +logger = logging.getLogger(__name__) + + +DEFAULT_PROTOCOL = 1 +SUPPORTED_PROTOCOLS = [1] + +# Index format (v1) is the following: +# +# (all integer types are little-endian) +# +# Header +# ------ +# protocol_version (32-bit uint): index file protocol version +# dir_checksum_count (64-bit uint): number of .dir checksums in data section +# file_checksum_count (64-bit uint): number of file checksums in data section +# crc - 32-bit CRC32 checksum of uncompressed data +# +# Data (total length = 16 * dir_checksum_count * file_checksum_count bytes) +# ---------------------- +# array of 128-bit MD5 .dir checksums +# array of 128-bit MD5 file checksums +_header_v1 = struct.Struct(" 0: + data = fobj.read(min(bytes_remaining, 16384)) + bytes_remaining -= len(data) + yield data + + for data in read_chunks(dir_count): + crc = zlib.crc32(data, crc) + checksums = chunks(32, data.hex()) + dir_checksums.update(checksum + dir_suffix for checksum in checksums) + for data in read_chunks(file_count): + crc = zlib.crc32(data, crc) + checksums = chunks(32, data.hex()) + file_checksums.update(checksums) + + if crc != file_crc: + raise DvcException("Remote index file failed CRC check") + + return dir_checksums, file_checksums + + +class RemoteIndex(object): + """Class for locally indexing remote checksums. + + Args: + repo: repo for this remote index. + name: name for this index. If name is provided, this index will be + loaded from and saved to ``.dvc/tmp/index/{name}.idx``. + If name is not provided (i.e. for local remotes), this index will + be kept in memory but not saved to disk. + dir_suffix: suffix used for naming directory checksums + """ + + INDEX_SUFFIX = ".idx" + + def __init__(self, repo, name=None, dir_suffix=".dir"): + if name: + self.path = os.path.join( + repo.index_dir, "{}{}".format(name, self.INDEX_SUFFIX) + ) + else: + self.path = None + self.dir_suffix = dir_suffix + self.lock = threading.RLock() + self._dir_checksums = set() + self._file_checksums = set() + self.modified = False + self.load() + + def __iter__(self): + return iter(self.checksums) + + @property + def checksums(self): + return self._dir_checksums | self._file_checksums + + def is_dir_checksum(self, checksum): + return checksum.endswith(self.dir_suffix) + + def load(self): + """(Re)load this index from disk.""" + if self.path and os.path.isfile(self.path): + self.lock.acquire() + try: + with open(self.path, "rb") as fobj: + self._dir_checksums, self._file_checksums = load( + fobj, dir_suffix=self.dir_suffix + ) + self.modified = False + except IOError as exc: + logger.error( + "Failed to load remote index file '{}'. " + "Remote will be re-indexed: '{}'".format(self.path, exc) + ) + finally: + self.lock.release() + + def save(self): + """Save this index to disk.""" + if self.path and self.modified: + self.lock.acquire() + try: + with open(self.path, "wb") as fobj: + dump(self._dir_checksums, self._file_checksums, fobj) + self.modified = False + except IOError as exc: + logger.error( + "Failed to save remote index file '{}': {}".format( + self.path, exc + ) + ) + finally: + self.lock.release() + + def invalidate(self): + """Invalidate this index (to force re-indexing later).""" + self.lock.acquire() + self._dir_checksums.clear() + self._file_checksums.clear() + self.modified = True + if self.path and os.path.isfile(self.path): + try: + os.unlink(self.path) + except IOError as exc: + logger.error( + "Failed to remove remote index file '{}': {}".format( + self.path, exc + ) + ) + self.lock.release() + + def replace(self, dir_checksums, file_checksums): + """Replace the contents of this index with the specified checksums. + + Changes to the index will not be written to disk. + """ + self.lock.acquire() + self._dir_checksums = set(dir_checksums) + self._file_checksums = set(file_checksums) + self.lock.release() + + def replace_all(self, *checksums): + """Replace the contents of this index with the specified checksums. + + Changes to the index will not be written to disk. + """ + dir_checksums, file_checksums = split(self.is_dir_checksum, *checksums) + self.replace(dir_checksums, file_checksums) + + def update(self, dir_checksums, file_checksums): + """Update this index, adding the specified checksums. + + Changes to the index will not be written to disk. + """ + self.lock.acquire() + self._dir_checksums.update(dir_checksums) + self._file_checksums.update(file_checksums) + self.modified = True + self.lock.release() + + def update_all(self, *checksums): + """Update this index, adding the specified checksums. + + Changes to the index will not be written to disk. + """ + dir_checksums, file_checksums = split(self.is_dir_checksum, *checksums) + self.update(dir_checksums, file_checksums) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index fe0270ef34..5882bcd0b9 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -2,11 +2,14 @@ import logging import os import stat -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed, ThreadPoolExecutor from functools import partial +from funcy import cat, concat + from shortuuid import uuid +from dvc.cache import NamedCache from dvc.compat import fspath_py35 from dvc.exceptions import DvcException, DownloadError, UploadError from dvc.path_info import PathInfo @@ -250,43 +253,120 @@ def open(path_info, mode="r", encoding=None): def status( self, - named_cache, + named_caches, remote, jobs=None, show_checksums=False, download=False, + drop_index=False, ): + # Return flattened dict containing all status info + dir_status, file_status, _ = self._status( + named_caches, + remote, + jobs=jobs, + show_checksums=show_checksums, + download=download, + drop_index=drop_index, + ) + remote.index.save() + return dict(dir_status, **file_status) + + def _status( + self, + named_caches, + remote, + jobs=None, + show_checksums=False, + download=False, + drop_index=False, + ): + """Return a tuple of (dir_status_info, file_status_info, dir_mapping). + + dir_status_info contains status for .dir files, file_status_info + contains status for all other files, and dir_mapping is a dict of + {dir_path_info: set(file_path_info...)} which can be used to map + a .dir file to its file contents. + """ logger.debug( "Preparing to collect status from {}".format(remote.path_info) ) - md5s = list(named_cache[self.scheme]) + merged_dir_cache = NamedCache() + merged_file_cache = NamedCache() + dir_md5s = {} + md5s = set() + for dir_cache, file_cache in named_caches: + merged_file_cache.update(file_cache) + md5s.update(file_cache[self.scheme]) + if dir_cache is not None: + merged_dir_cache.update(dir_cache) + for checksum in dir_cache[self.scheme]: + dir_md5s[checksum] = frozenset(file_cache[self.scheme]) + md5s.update(dir_md5s.keys()) logger.debug("Collecting information from local cache...") - local_exists = self.cache_exists(md5s, jobs=jobs, name=self.cache_dir) + local_exists = frozenset( + self.cache_exists(md5s, jobs=jobs, name=self.cache_dir) + ) # This is a performance optimization. We can safely assume that, # if the resources that we want to fetch are already cached, # there's no need to check the remote storage for the existence of # those files. - if download and sorted(local_exists) == sorted(md5s): + if download and local_exists == md5s: remote_exists = local_exists else: logger.debug("Collecting information from remote cache...") - remote_exists = list( - remote.cache_exists( - md5s, jobs=jobs, name=str(remote.path_info) + remote_exists = set() + if drop_index: + logger.debug("Clearing local index for remote cache") + remote.index.invalidate() + else: + # verify that our index is still valid by checking that any + # known .dir checksums still exist on the remote + verify_md5s = remote.index.checksums.intersection( + cat(dir_md5s.keys()) + ) + if verify_md5s: + remote_dir_exists = frozenset( + remote._cache_object_exists(verify_md5s) + ) + md5s -= remote_dir_exists + remote_exists.update(remote_dir_exists) + if remote_dir_exists != verify_md5s: + logger.debug( + "Remote cache missing an expected .dir checksum, " + "clearing local index" + ) + remote.index.invalidate() + if md5s: + remote_exists.update( + remote.cache_exists( + md5s, jobs=jobs, name=str(remote.path_info) + ) ) - ) - - ret = { - checksum: {"name": checksum if show_checksums else " ".join(names)} - for checksum, names in named_cache[self.scheme].items() - } - self._fill_statuses(ret, local_exists, remote_exists) - - self._log_missing_caches(ret) - return ret + def cache_to_dict(cache): + return { + checksum: { + "name": checksum if show_checksums else " ".join(names) + } + for checksum, names in cache[self.scheme].items() + } + + dir_status = cache_to_dict(merged_dir_cache) + file_status = cache_to_dict(merged_file_cache) + self._fill_statuses(dir_status, local_exists, remote_exists) + self._fill_statuses(file_status, local_exists, remote_exists) + + self._log_missing_caches(dict(dir_status, **file_status)) + + dir_paths = {} + for md5, file_md5s in dir_md5s.items(): + dir_paths[remote.checksum_to_path_info(md5)] = frozenset( + map(remote.checksum_to_path_info, file_md5s) + ) + return dir_status, file_status, dir_paths @staticmethod def _fill_statuses(checksum_info_dir, local_exists, remote_exists): @@ -321,11 +401,12 @@ def _get_plans(self, download, remote, status_info, status): def _process( self, - named_cache, + named_caches, remote, jobs=None, show_checksums=False, download=False, + drop_index=False, ): logger.debug( "Preparing to {} '{}'".format( @@ -348,48 +429,121 @@ def _process( if jobs is None: jobs = remote.JOBS - status_info = self.status( - named_cache, + dir_status, file_status, dir_paths = self._status( + named_caches, remote, jobs=jobs, show_checksums=show_checksums, download=download, + drop_index=drop_index, ) - plans = self._get_plans(download, remote, status_info, status) + dir_plans = self._get_plans(download, remote, dir_status, status) + file_plans = self._get_plans(download, remote, file_status, status) - if len(plans[0]) == 0: + if len(dir_plans[0]) + len(file_plans[0]) == 0: return 0 - if jobs > 1: - with ThreadPoolExecutor(max_workers=jobs) as executor: - fails = sum(executor.map(func, *plans)) - else: - fails = sum(map(func, *plans)) + with ThreadPoolExecutor(max_workers=jobs) as executor: + if download: + fails = sum(executor.map(func, *dir_plans)) + fails += sum(executor.map(func, *file_plans)) + else: + # for uploads, push files first, and any .dir files last + + file_futures = {} + for from_info, to_info, name in zip(*file_plans): + file_futures[to_info] = executor.submit( + func, from_info, to_info, name + ) + dir_futures = {} + for from_info, to_info, name in zip(*dir_plans): + wait_futures = { + future + for file_path, future in file_futures.items() + if file_path in dir_paths[to_info] + } + dir_futures[to_info] = executor.submit( + self._dir_upload, + func, + wait_futures, + from_info, + to_info, + name, + ) + fails = sum( + future.result() + for future in concat( + file_futures.values(), dir_futures.values() + ) + ) if fails: if download: + remote.index.invalidate() raise DownloadError(fails) raise UploadError(fails) + elif not download: + pushed_dir_checksums = list( + map(self.path_to_checksum, dir_plans[0]) + ) + pushed_file_checksums = list( + map(self.path_to_checksum, file_plans[0]) + ) + logger.debug( + "Adding {} pushed checksums to index".format( + len(pushed_dir_checksums) + len(pushed_file_checksums) + ) + ) + remote.index.update(pushed_dir_checksums, pushed_file_checksums) + remote.index.save() + + return len(dir_plans[0]) + len(file_plans[0]) + + def _dir_upload(self, func, futures, from_info, to_info, name): + for future in as_completed(futures): + if future.result(): + # do not upload this .dir file if any file in this + # directory failed to upload + logger.error( + "one or more files failed while uploading " + "'{}' to '{}'".format(from_info, to_info) + ) + return 1 + return func(from_info, to_info, name) - return len(plans[0]) - - def push(self, named_cache, remote, jobs=None, show_checksums=False): + def push( + self, + named_caches, + remote, + jobs=None, + show_checksums=False, + drop_index=False, + ): return self._process( - named_cache, + named_caches, remote, jobs=jobs, show_checksums=show_checksums, download=False, + drop_index=drop_index, ) - def pull(self, named_cache, remote, jobs=None, show_checksums=False): + def pull( + self, + named_caches, + remote, + jobs=None, + show_checksums=False, + drop_index=False, + ): return self._process( - named_cache, + named_caches, remote, jobs=jobs, show_checksums=show_checksums, download=True, + drop_index=drop_index, ) @staticmethod diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index e7f2b4dfd3..ee739ab39c 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -84,7 +84,8 @@ def __init__(self, root_dir=None): self.tree = WorkingTree(self.root_dir) self.tmp_dir = os.path.join(self.dvc_dir, "tmp") - makedirs(self.tmp_dir, exist_ok=True) + self.index_dir = os.path.join(self.tmp_dir, "index") + makedirs(self.index_dir, exist_ok=True) hardlink_lock = self.config["core"].get("hardlink_lock", False) self.lock = make_lock( @@ -251,12 +252,18 @@ def used_cache( `all_branches`/`all_tags`/`all_commits` to expand the scope. Returns: - A dictionary with Schemes (representing output's location) as keys, + A list of 2-tuples in the form (dir_cache, file_cache). + Each NamedCache object is a dictionary with Schemes + (representing output's location) as keys, and a list with the outputs' `dumpd` as values. + If the given output is not a directory, the first tuple entry + will be None. """ from dvc.cache import NamedCache - cache = NamedCache() + used_caches = [] + # group together file caches which do not have an associated directory + file_caches = NamedCache() for branch in self.brancher( all_branches=all_branches, @@ -274,15 +281,24 @@ def used_cache( suffix = "({})".format(branch) if branch else "" for stage, filter_info in pairs: - used_cache = stage.get_used_cache( + for dir_cache, file_cache in stage.get_used_cache( remote=remote, force=force, jobs=jobs, filter_info=filter_info, - ) - cache.update(used_cache, suffix=suffix) - - return cache + ): + if dir_cache is None: + file_caches.update(file_cache, suffix=suffix) + else: + used_dir = NamedCache() + used_dir.update(dir_cache, suffix=suffix) + used_file = NamedCache() + used_file.update(file_cache, suffix=suffix) + used_caches.append((used_dir, used_file)) + + if file_caches._items or file_caches.external: + used_caches.append((None, file_caches)) + return used_caches def _collect_graph(self, stages=None): """Generate a graph by using the given stages on the given directory diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index f77491bb79..e4ecb38453 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -1,12 +1,12 @@ import logging -from dvc.cache import NamedCache +from funcy import concat + from dvc.config import NoRemoteError from dvc.exceptions import DownloadError, OutputNotFoundError from dvc.scm.base import CloneError from dvc.path_info import PathInfo - logger = logging.getLogger(__name__) @@ -21,6 +21,7 @@ def _fetch( all_tags=False, recursive=False, all_commits=False, + drop_index=False, ): """Download data items from a cloud and imported repositories @@ -51,18 +52,41 @@ def _fetch( try: downloaded += self.cloud.pull( - used, jobs, remote=remote, show_checksums=show_checksums + used, + jobs, + remote=remote, + show_checksums=show_checksums, + drop_index=drop_index, ) except NoRemoteError: - if not used.external and used["local"]: + external = False + local = False + for dir_cache, file_cache in used: + if dir_cache: + if dir_cache.external: + external = True + if dir_cache["local"]: + local = True + if file_cache.external: + external = True + if file_cache["local"]: + local = True + if not external and local: raise except DownloadError as exc: failed += exc.amount - for (repo_url, repo_rev), files in used.external.items(): - d, f = _fetch_external(self, repo_url, repo_rev, files, jobs) - downloaded += d - failed += f + for dir_cache, file_cache in used: + if dir_cache is None: + items = file_cache.external.items() + else: + items = concat( + dir_cache.external.items(), file_cache.external.items() + ) + for (repo_url, repo_rev), files in items: + d, f = _fetch_external(self, repo_url, repo_rev, files, jobs) + downloaded += d + failed += f if failed: raise DownloadError(failed) @@ -82,7 +106,7 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): if is_dvc_repo: repo.cache.local.cache_dir = self.cache.local.cache_dir with repo.state: - cache = NamedCache() + used_cache = [] for name in files: try: out = repo.find_out_by_relpath(name) @@ -90,10 +114,12 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): # try to add to cache if they are git-tracked files git_files.append(name) else: - cache.update(out.get_used_cache()) + used_cache.append(out.get_used_cache()) try: - downloaded += repo.cloud.pull(cache, jobs=jobs) + downloaded += repo.cloud.pull( + used_cache, jobs=jobs + ) except DownloadError as exc: failed += exc.amount diff --git a/dvc/repo/gc.py b/dvc/repo/gc.py index 8f1222fd84..5cd5d2b645 100644 --- a/dvc/repo/gc.py +++ b/dvc/repo/gc.py @@ -1,7 +1,6 @@ import logging from . import locked -from dvc.cache import NamedCache from dvc.exceptions import InvalidArgumentError @@ -60,9 +59,9 @@ def gc( stack.enter_context(repo.lock) stack.enter_context(repo.state) - used = NamedCache() + used = [] for repo in all_repos + [self]: - used.update( + used.extend( repo.used_cache( all_branches=all_branches, with_deps=with_deps, diff --git a/dvc/repo/pull.py b/dvc/repo/pull.py index 4623f6a4ec..36d99f45b6 100644 --- a/dvc/repo/pull.py +++ b/dvc/repo/pull.py @@ -18,6 +18,7 @@ def pull( force=False, recursive=False, all_commits=False, + drop_index=False, ): processed_files_count = self._fetch( targets, @@ -28,6 +29,7 @@ def pull( all_commits=all_commits, with_deps=with_deps, recursive=recursive, + drop_index=drop_index, ) stats = self._checkout( targets=targets, with_deps=with_deps, force=force, recursive=recursive diff --git a/dvc/repo/push.py b/dvc/repo/push.py index 43946d37c0..ed515dd4f5 100644 --- a/dvc/repo/push.py +++ b/dvc/repo/push.py @@ -12,6 +12,7 @@ def push( all_tags=False, recursive=False, all_commits=False, + drop_index=False, ): used = self.used_cache( targets, @@ -24,4 +25,4 @@ def push( jobs=jobs, recursive=recursive, ) - return self.cloud.push(used, jobs, remote=remote) + return self.cloud.push(used, jobs, remote=remote, drop_index=drop_index) diff --git a/dvc/repo/status.py b/dvc/repo/status.py index 40780c66ec..4d48a81ba2 100644 --- a/dvc/repo/status.py +++ b/dvc/repo/status.py @@ -45,6 +45,7 @@ def _cloud_status( with_deps=False, all_tags=False, all_commits=False, + drop_index=False, ): """Returns a dictionary with the files that are new or deleted. @@ -86,7 +87,9 @@ def _cloud_status( ) ret = {} - status_info = self.cloud.status(used, jobs, remote=remote) + status_info = self.cloud.status( + used, jobs, remote=remote, drop_index=drop_index + ) for info in status_info.values(): name = info["name"] status = info["status"] @@ -111,6 +114,7 @@ def status( with_deps=False, all_tags=False, all_commits=False, + drop_index=False, ): if cloud or remote: return _cloud_status( @@ -122,6 +126,7 @@ def status( remote=remote, all_tags=all_tags, all_commits=all_commits, + drop_index=drop_index, ) ignored = list( diff --git a/dvc/stage.py b/dvc/stage.py index 14de51501a..4a74cc13fb 100644 --- a/dvc/stage.py +++ b/dvc/stage.py @@ -1059,10 +1059,9 @@ def get_all_files_number(self, filter_info=None): ) def get_used_cache(self, *args, **kwargs): - from .cache import NamedCache - cache = NamedCache() + ret = [] for out in self._filter_outs(kwargs.get("filter_info")): - cache.update(out.get_used_cache(*args, **kwargs)) + ret.append(out.get_used_cache(*args, **kwargs)) - return cache + return ret diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 1bf93ab7fb..2d17f27616 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -112,7 +112,7 @@ def _test_cloud(self): out = stage.outs[0] cache = out.cache_path md5 = out.checksum - info = out.get_used_cache() + info = [out.get_used_cache()] stages = self.dvc.add(self.DATA_DIR) self.assertEqual(len(stages), 1) @@ -122,7 +122,9 @@ def _test_cloud(self): cache_dir = out_dir.cache_path name_dir = str(out_dir) md5_dir = out_dir.checksum - info_dir = NamedCache.make(out_dir.scheme, md5_dir, name_dir) + info_dir = [ + (NamedCache.make(out_dir.scheme, md5_dir, name_dir), NamedCache()) + ] with self.cloud.repo.state: # Check status diff --git a/tests/unit/command/test_data_sync.py b/tests/unit/command/test_data_sync.py index 41d464dbd9..2bc083b70a 100644 --- a/tests/unit/command/test_data_sync.py +++ b/tests/unit/command/test_data_sync.py @@ -54,6 +54,7 @@ def test_pull(mocker): "--with-deps", "--force", "--recursive", + "--drop-index", ] ) assert cli_args.func == CmdDataPull @@ -73,6 +74,7 @@ def test_pull(mocker): with_deps=True, force=True, recursive=True, + drop_index=True, ) @@ -91,6 +93,7 @@ def test_push(mocker): "--all-commits", "--with-deps", "--recursive", + "--drop-index", ] ) assert cli_args.func == CmdDataPush @@ -109,4 +112,5 @@ def test_push(mocker): all_commits=True, with_deps=True, recursive=True, + drop_index=True, ) diff --git a/tests/unit/command/test_status.py b/tests/unit/command/test_status.py index 7b8ea33248..2663a6bc7c 100644 --- a/tests/unit/command/test_status.py +++ b/tests/unit/command/test_status.py @@ -17,6 +17,7 @@ def test_cloud_status(mocker): "--all-tags", "--all-commits", "--with-deps", + "--drop-index", ] ) assert cli_args.func == CmdDataStatus @@ -35,4 +36,5 @@ def test_cloud_status(mocker): all_tags=True, all_commits=True, with_deps=True, + drop_index=True, ) diff --git a/tests/unit/output/test_output.py b/tests/unit/output/test_output.py index fb9325471b..42e503ec83 100644 --- a/tests/unit/output/test_output.py +++ b/tests/unit/output/test_output.py @@ -86,5 +86,8 @@ def test_get_used_cache(exists, expected_message, mocker, caplog): ).return_value = exists with caplog.at_level(logging.WARNING, logger="dvc"): - assert isinstance(output.get_used_cache(), NamedCache) + used = output.get_used_cache() + assert isinstance(used, tuple) + assert used[0] is None + assert isinstance(used[1], NamedCache) assert first(caplog.messages) == expected_message diff --git a/tests/unit/remote/test_base.py b/tests/unit/remote/test_base.py index e8cafa3b26..7334594627 100644 --- a/tests/unit/remote/test_base.py +++ b/tests/unit/remote/test_base.py @@ -52,7 +52,7 @@ def test_cache_exists(object_exists, traverse, dvc): with mock.patch.object( remote, "cache_checksums", return_value=list(range(256)) ): - checksums = list(range(1000)) + checksums = set(range(1000)) remote.cache_exists(checksums) object_exists.assert_called_with(checksums, None, None) traverse.assert_not_called() diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py new file mode 100644 index 0000000000..bbb61b4f1f --- /dev/null +++ b/tests/unit/remote/test_index.py @@ -0,0 +1,95 @@ +import os.path + +from dvc.remote.index import dump, load, RemoteIndex + + +def test_protocol_v1_roundtrip(tmp_dir): + tmpfile = os.path.join(tmp_dir, "foo.idx") + + expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(map(_to_checksum, range(10000))) + with open(tmpfile, "wb") as fobj: + dump(expected_dir, expected_file, fobj) + with open(tmpfile, "rb") as fobj: + dir_checksums, file_checksums = load(fobj, dir_suffix=".dir") + assert dir_checksums == expected_dir + assert not expected_file ^ file_checksums + + +def _to_checksum(n): + return "{:032}".format(n) + + +def test_init(dvc): + index = RemoteIndex(dvc, "foo") + assert str(index.path) == os.path.join(dvc.index_dir, "foo.idx") + + +def test_roundtrip(dvc): + expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index = RemoteIndex(dvc, "foo") + index.replace(expected_dir, expected_file) + index.save() + index.load() + assert index._dir_checksums == expected_dir + assert index._file_checksums == expected_file + assert index.checksums == expected_dir | expected_file + + +def test_invalidate(dvc): + index = RemoteIndex(dvc, "foo") + index.replace( + ["0123456789abcdef0123456789abcdef.dir"], + ["fedcba9876543210fedcba9876543210"], + ) + index.save() + index.invalidate() + assert not index.checksums + assert not os.path.exists(index.path) + + +def test_replace(dvc): + index = RemoteIndex(dvc, "foo") + index._dir_checksums = set() + index._file_checksums = set() + expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(expected_dir, expected_file) + assert index._dir_checksums == expected_dir + assert index._file_checksums == expected_file + + +def test_replace_all(dvc): + index = RemoteIndex(dvc, "foo") + index._dir_checksums = set() + index._file_checksums = set() + expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace_all(expected_dir | expected_file) + assert index._dir_checksums == expected_dir + assert index._file_checksums == expected_file + + +def test_update(dvc): + index = RemoteIndex(dvc, "foo") + initial_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + initial_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(initial_dir, initial_file) + expected_dir = frozenset(["1123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543211"]) + index.update(expected_dir, expected_file) + assert index._dir_checksums == initial_dir | expected_dir + assert index._file_checksums == initial_file | expected_file + + +def test_update_all(dvc): + index = RemoteIndex(dvc, "foo") + initial_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + initial_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(initial_dir, initial_file) + expected_dir = frozenset(["1123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543211"]) + index.update_all(expected_dir | expected_file) + assert index._dir_checksums == initial_dir | expected_dir + assert index._file_checksums == initial_file | expected_file diff --git a/tests/unit/remote/test_local.py b/tests/unit/remote/test_local.py index 04e216bde0..6ef1c389bf 100644 --- a/tests/unit/remote/test_local.py +++ b/tests/unit/remote/test_local.py @@ -26,7 +26,7 @@ def test_status_download_optimization(mocker): other_remote.url = "other_remote" other_remote.cache_exists.return_value = [] - remote.status(infos, other_remote, download=True) + remote.status([(None, infos)], other_remote, download=True) assert other_remote.cache_exists.call_count == 0 diff --git a/tests/unit/repo/test_repo.py b/tests/unit/repo/test_repo.py index 985c113ef1..447487f5b0 100644 --- a/tests/unit/repo/test_repo.py +++ b/tests/unit/repo/test_repo.py @@ -37,10 +37,10 @@ def test_used_cache(tmp_dir, dvc, path): from dvc.cache import NamedCache tmp_dir.dvc_gen({"dir": {"subdir": {"file": "file"}, "other": "other"}}) - expected = NamedCache.make( + expected_dir = NamedCache.make( "local", "70922d6bf66eb073053a82f77d58c536.dir", "dir" ) - expected.add( + expected_file = NamedCache.make( "local", "8c7dd922ad47494fc02c388e12c00eac", os.path.join("dir", "subdir", "file"), @@ -48,9 +48,16 @@ def test_used_cache(tmp_dir, dvc, path): with dvc.state: used_cache = dvc.used_cache([path]) + assert isinstance(used_cache, list) + assert len(used_cache) == 1 + assert isinstance(used_cache[0], tuple) + used_dir = used_cache[0][0] + used_file = used_cache[0][1] assert ( - used_cache._items == expected._items - and used_cache.external == expected.external + used_dir._items == expected_dir._items + and used_dir.external == expected_dir.external + and used_file._items == expected_file._items + and used_file.external == expected_file.external )