diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c7da00e5dd..7414397370 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -322,7 +322,7 @@ def is_dir_checksum(cls, checksum): return checksum.endswith(cls.CHECKSUM_DIR_SUFFIX) def get_checksum(self, path_info): - assert path_info.scheme == self.scheme + assert isinstance(path_info, str) or path_info.scheme == self.scheme if not self.exists(path_info): return None @@ -719,6 +719,10 @@ def path_to_checksum(self, path): def checksum_to_path_info(self, checksum): return self.path_info / checksum[0:2] / checksum[2:] + # Return path as a string instead of PathInfo for remotes which support + # string paths (see local) + checksum_to_path = checksum_to_path_info + def list_cache_paths(self, prefix=None, progress_callback=None): raise NotImplementedError @@ -796,8 +800,8 @@ def changed_cache_file(self, checksum): - Remove the file from cache if it doesn't match the actual checksum """ - - cache_info = self.checksum_to_path_info(checksum) + # Prefer string path over PathInfo when possible due to performance + cache_info = self.checksum_to_path(checksum) if self.is_protected(cache_info): logger.debug( "Assuming '%s' is unchanged since it is read-only", cache_info diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 52710a34b7..87f8642290 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -5,7 +5,7 @@ from concurrent.futures import as_completed, ThreadPoolExecutor from functools import partial -from funcy import concat +from funcy import cached_property, concat from shortuuid import uuid @@ -67,6 +67,13 @@ def cache_dir(self, value): def supported(cls, config): return True + @cached_property + def cache_path(self): + return os.path.abspath(self.cache_dir) + + def checksum_to_path(self, checksum): + return os.path.join(self.cache_path, checksum[0:2], checksum[2:]) + def list_cache_paths(self, prefix=None, progress_callback=None): assert self.path_info is not None if prefix: @@ -88,7 +95,7 @@ def get(self, md5): def exists(self, path_info): assert is_working_tree(self.repo.tree) - assert path_info.scheme == "local" + assert isinstance(path_info, str) or path_info.scheme == "local" return self.repo.tree.exists(fspath_py35(path_info)) def makedirs(self, path_info): @@ -148,11 +155,15 @@ def get_file_checksum(self, path_info): return file_md5(path_info)[0] def remove(self, path_info): - if path_info.scheme != "local": - raise NotImplementedError + if isinstance(path_info, PathInfo): + if path_info.scheme != "local": + raise NotImplementedError + path = path_info.fspath + else: + path = path_info - if self.exists(path_info): - remove(path_info.fspath) + if self.exists(path): + remove(path) def move(self, from_info, to_info, mode=None): if from_info.scheme != "local" or to_info.scheme != "local": @@ -285,11 +296,11 @@ def _status( show_checksums=False, download=False, ): - """Return a tuple of (dir_status_info, file_status_info, dir_mapping). + """Return a tuple of (dir_status_info, file_status_info, dir_contents). dir_status_info contains status for .dir files, file_status_info - contains status for all other files, and dir_mapping is a dict of - {dir_path_info: set(file_path_info...)} which can be used to map + contains status for all other files, and dir_contents is a dict of + {dir_checksum: set(file_checksum, ...)} which can be used to map a .dir file to its file contents. """ logger.debug( @@ -324,30 +335,27 @@ def _status( ) ) return self._make_status( - named_cache, remote, show_checksums, local_exists, remote_exists + named_cache, show_checksums, local_exists, remote_exists ) def _make_status( - self, named_cache, remote, show_checksums, local_exists, remote_exists + self, named_cache, show_checksums, local_exists, remote_exists ): def make_names(checksum, names): return {"name": checksum if show_checksums else " ".join(names)} dir_status = {} file_status = {} - dir_paths = {} + dir_contents = {} for checksum, item in named_cache[self.scheme].items(): if item.children: dir_status[checksum] = make_names(checksum, item.names) - file_status.update( - { - child_checksum: make_names(child_checksum, child.names) - for child_checksum, child in item.children.items() - } - ) - dir_paths[remote.checksum_to_path_info(checksum)] = frozenset( - map(remote.checksum_to_path_info, item.child_keys()) - ) + dir_contents[checksum] = set() + for child_checksum, child in item.children.items(): + file_status[child_checksum] = make_names( + child_checksum, child.names + ) + dir_contents[checksum].add(child_checksum) else: file_status[checksum] = make_names(checksum, item.names) @@ -356,7 +364,7 @@ def make_names(checksum, names): self._log_missing_caches(dict(dir_status, **file_status)) - return dir_status, file_status, dir_paths + return dir_status, file_status, dir_contents def _indexed_dir_checksums(self, named_cache, remote, dir_md5s): # Validate our index by verifying all indexed .dir checksums @@ -409,6 +417,7 @@ def _get_plans(self, download, remote, status_info, status): cache = [] path_infos = [] names = [] + checksums = [] for md5, info in Tqdm( status_info.items(), desc="Analysing status", unit="file" ): @@ -416,6 +425,7 @@ def _get_plans(self, download, remote, status_info, status): cache.append(self.checksum_to_path_info(md5)) path_infos.append(remote.checksum_to_path_info(md5)) names.append(info["name"]) + checksums.append(md5) if download: to_infos = cache @@ -424,7 +434,7 @@ def _get_plans(self, download, remote, status_info, status): to_infos = path_infos from_infos = cache - return from_infos, to_infos, names + return from_infos, to_infos, names, checksums def _process( self, @@ -457,7 +467,7 @@ def _process( if jobs is None: jobs = remote.JOBS - dir_status, file_status, dir_paths = self._status( + dir_status, file_status, dir_contents = self._status( named_cache, remote, jobs=jobs, @@ -482,18 +492,20 @@ def _process( # for uploads, push files first, and any .dir files last file_futures = {} - for from_info, to_info, name in zip(*file_plans): - file_futures[to_info] = executor.submit( + for from_info, to_info, name, checksum in zip(*file_plans): + file_futures[checksum] = executor.submit( func, from_info, to_info, name ) dir_futures = {} - for from_info, to_info, name in zip(*dir_plans): + for from_info, to_info, name, dir_checksum in zip( + *dir_plans + ): wait_futures = { future - for file_path, future in file_futures.items() - if file_path in dir_paths[to_info] + for file_checksum, future in file_futures.items() + if file_checksum in dir_contents[dir_checksum] } - dir_futures[to_info] = executor.submit( + dir_futures[dir_checksum] = executor.submit( self._dir_upload, func, wait_futures, @@ -516,12 +528,9 @@ def _process( if not download: # index successfully pushed dirs - for to_info, future in dir_futures.items(): + for dir_checksum, future in dir_futures.items(): if future.result() == 0: - dir_checksum = remote.path_to_checksum(str(to_info)) - file_checksums = list( - named_cache.child_keys(self.scheme, dir_checksum) - ) + file_checksums = dir_contents[dir_checksum] logger.debug( "Indexing pushed dir '{}' with " "'{}' nested files".format( diff --git a/dvc/state.py b/dvc/state.py index 3cdfd09fc2..22881daff8 100644 --- a/dvc/state.py +++ b/dvc/state.py @@ -367,7 +367,7 @@ def save(self, path_info, checksum): path_info (dict): path_info to save checksum for. checksum (str): checksum to save. """ - assert path_info.scheme == "local" + assert isinstance(path_info, str) or path_info.scheme == "local" assert checksum is not None assert os.path.exists(fspath_py35(path_info)) @@ -398,7 +398,7 @@ def get(self, path_info): str or None: checksum for the specified path info or None if it doesn't exist in the state database. """ - assert path_info.scheme == "local" + assert isinstance(path_info, str) or path_info.scheme == "local" path = fspath_py35(path_info) if not os.path.exists(path): @@ -425,7 +425,7 @@ def save_link(self, path_info): Args: path_info (dict): path info to add to the list of links. """ - assert path_info.scheme == "local" + assert isinstance(path_info, str) or path_info.scheme == "local" if not os.path.exists(fspath_py35(path_info)): return