From 4f2fce20d39ac19f42d5db06f28e61f3c033144f Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 23 Apr 2020 17:01:27 +0900 Subject: [PATCH 1/9] remote: use checksums instead of paths when filling dir statuses --- dvc/remote/local.py | 52 ++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 52710a34b7..7a3a656d70 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -285,11 +285,11 @@ def _status( show_checksums=False, download=False, ): - """Return a tuple of (dir_status_info, file_status_info, dir_mapping). + """Return a tuple of (dir_status_info, file_status_info, dir_contents). dir_status_info contains status for .dir files, file_status_info - contains status for all other files, and dir_mapping is a dict of - {dir_path_info: set(file_path_info...)} which can be used to map + contains status for all other files, and dir_contents is a dict of + {dir_checksum: set(file_checksum, ...)} which can be used to map a .dir file to its file contents. """ logger.debug( @@ -335,19 +335,16 @@ def make_names(checksum, names): dir_status = {} file_status = {} - dir_paths = {} + dir_contents = {} for checksum, item in named_cache[self.scheme].items(): if item.children: dir_status[checksum] = make_names(checksum, item.names) - file_status.update( - { - child_checksum: make_names(child_checksum, child.names) - for child_checksum, child in item.children.items() - } - ) - dir_paths[remote.checksum_to_path_info(checksum)] = frozenset( - map(remote.checksum_to_path_info, item.child_keys()) - ) + dir_contents[checksum] = set() + for child_checksum, child in item.children.items(): + file_status[child_checksum] = make_names( + child_checksum, child.names + ) + dir_contents[checksum].add(child_checksum) else: file_status[checksum] = make_names(checksum, item.names) @@ -356,7 +353,7 @@ def make_names(checksum, names): self._log_missing_caches(dict(dir_status, **file_status)) - return dir_status, file_status, dir_paths + return dir_status, file_status, dir_contents def _indexed_dir_checksums(self, named_cache, remote, dir_md5s): # Validate our index by verifying all indexed .dir checksums @@ -409,6 +406,7 @@ def _get_plans(self, download, remote, status_info, status): cache = [] path_infos = [] names = [] + checksums = [] for md5, info in Tqdm( status_info.items(), desc="Analysing status", unit="file" ): @@ -416,6 +414,7 @@ def _get_plans(self, download, remote, status_info, status): cache.append(self.checksum_to_path_info(md5)) path_infos.append(remote.checksum_to_path_info(md5)) names.append(info["name"]) + checksums.append(md5) if download: to_infos = cache @@ -424,7 +423,7 @@ def _get_plans(self, download, remote, status_info, status): to_infos = path_infos from_infos = cache - return from_infos, to_infos, names + return from_infos, to_infos, names, checksums def _process( self, @@ -457,7 +456,7 @@ def _process( if jobs is None: jobs = remote.JOBS - dir_status, file_status, dir_paths = self._status( + dir_status, file_status, dir_contents = self._status( named_cache, remote, jobs=jobs, @@ -482,18 +481,20 @@ def _process( # for uploads, push files first, and any .dir files last file_futures = {} - for from_info, to_info, name in zip(*file_plans): - file_futures[to_info] = executor.submit( + for from_info, to_info, name, checksum in zip(*file_plans): + file_futures[checksum] = executor.submit( func, from_info, to_info, name ) dir_futures = {} - for from_info, to_info, name in zip(*dir_plans): + for from_info, to_info, name, dir_checksum in zip( + *dir_plans + ): wait_futures = { future - for file_path, future in file_futures.items() - if file_path in dir_paths[to_info] + for file_checksum, future in file_futures.items() + if file_checksum in dir_contents[dir_checksum] } - dir_futures[to_info] = executor.submit( + dir_futures[dir_checksum] = executor.submit( self._dir_upload, func, wait_futures, @@ -516,12 +517,9 @@ def _process( if not download: # index successfully pushed dirs - for to_info, future in dir_futures.items(): + for dir_checksum, future in dir_futures.items(): if future.result() == 0: - dir_checksum = remote.path_to_checksum(str(to_info)) - file_checksums = list( - named_cache.child_keys(self.scheme, dir_checksum) - ) + file_checksums = dir_contents[dir_checksum] logger.debug( "Indexing pushed dir '{}' with " "'{}' nested files".format( From 255e0ce0ce868975b12841397da022ac85f275d1 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 23 Apr 2020 17:41:23 +0900 Subject: [PATCH 2/9] remote: prefer using str paths over PathInfo for performance reasons --- dvc/remote/base.py | 30 ++++++++++++++++++++---------- dvc/remote/local.py | 2 +- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c7da00e5dd..e7d421da85 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -3,6 +3,7 @@ import itertools import json import logging +import posixpath import tempfile from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor @@ -11,6 +12,8 @@ from multiprocessing import cpu_count from operator import itemgetter +from funcy import cached_property + from shortuuid import uuid import dvc.prompt as prompt @@ -322,7 +325,7 @@ def is_dir_checksum(cls, checksum): return checksum.endswith(cls.CHECKSUM_DIR_SUFFIX) def get_checksum(self, path_info): - assert path_info.scheme == self.scheme + assert isinstance(path_info, str) or path_info.scheme == self.scheme if not self.exists(path_info): return None @@ -719,6 +722,13 @@ def path_to_checksum(self, path): def checksum_to_path_info(self, checksum): return self.path_info / checksum[0:2] / checksum[2:] + def checksum_to_path(self, checksum): + return posixpath.join(self._path_str, checksum[0:2], checksum[2:]) + + @cached_property + def _path_str(self): + return str(self.path_info) + def list_cache_paths(self, prefix=None, progress_callback=None): raise NotImplementedError @@ -797,18 +807,18 @@ def changed_cache_file(self, checksum): - Remove the file from cache if it doesn't match the actual checksum """ - cache_info = self.checksum_to_path_info(checksum) - if self.is_protected(cache_info): + cache_path = self.checksum_to_path(checksum) + if self.is_protected(cache_path): logger.debug( - "Assuming '%s' is unchanged since it is read-only", cache_info + "Assuming '%s' is unchanged since it is read-only", cache_path ) return False - actual = self.get_checksum(cache_info) + actual = self.get_checksum(cache_path) logger.debug( "cache '%s' expected '%s' actual '%s'", - cache_info, + cache_path, checksum, actual, ) @@ -819,12 +829,12 @@ def changed_cache_file(self, checksum): if actual.split(".")[0] == checksum.split(".")[0]: # making cache file read-only so we don't need to check it # next time - self.protect(cache_info) + self.protect(cache_path) return False - if self.exists(cache_info): - logger.warning("corrupted cache file '%s'.", cache_info) - self.remove(cache_info) + if self.exists(cache_path): + logger.warning("corrupted cache file '%s'.", cache_path) + self.remove(cache_path) return True diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 7a3a656d70..0f97fce956 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -88,7 +88,7 @@ def get(self, md5): def exists(self, path_info): assert is_working_tree(self.repo.tree) - assert path_info.scheme == "local" + assert isinstance(path_info, str) or path_info.scheme == "local" return self.repo.tree.exists(fspath_py35(path_info)) def makedirs(self, path_info): From 7ee6897abf1658315a6eded79e20b5ffa42894c8 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 23 Apr 2020 18:44:54 +0900 Subject: [PATCH 3/9] only use string paths for RemoteLOCAL --- dvc/remote/base.py | 23 ++++++++++------------- dvc/remote/local.py | 16 ++++++++++++---- dvc/state.py | 6 +++--- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index e7d421da85..82b19dc66c 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -3,7 +3,6 @@ import itertools import json import logging -import posixpath import tempfile from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor @@ -722,8 +721,7 @@ def path_to_checksum(self, path): def checksum_to_path_info(self, checksum): return self.path_info / checksum[0:2] / checksum[2:] - def checksum_to_path(self, checksum): - return posixpath.join(self._path_str, checksum[0:2], checksum[2:]) + checksum_to_path = checksum_to_path_info @cached_property def _path_str(self): @@ -806,19 +804,18 @@ def changed_cache_file(self, checksum): - Remove the file from cache if it doesn't match the actual checksum """ - - cache_path = self.checksum_to_path(checksum) - if self.is_protected(cache_path): + cache_info = self.checksum_to_path(checksum) + if self.is_protected(cache_info): logger.debug( - "Assuming '%s' is unchanged since it is read-only", cache_path + "Assuming '%s' is unchanged since it is read-only", cache_info ) return False - actual = self.get_checksum(cache_path) + actual = self.get_checksum(cache_info) logger.debug( "cache '%s' expected '%s' actual '%s'", - cache_path, + cache_info, checksum, actual, ) @@ -829,12 +826,12 @@ def changed_cache_file(self, checksum): if actual.split(".")[0] == checksum.split(".")[0]: # making cache file read-only so we don't need to check it # next time - self.protect(cache_path) + self.protect(cache_info) return False - if self.exists(cache_path): - logger.warning("corrupted cache file '%s'.", cache_path) - self.remove(cache_path) + if self.exists(cache_info): + logger.warning("corrupted cache file '%s'.", cache_info) + self.remove(cache_info) return True diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 0f97fce956..7c7a574aaf 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -1,6 +1,7 @@ import errno import logging import os +import posixpath import stat from concurrent.futures import as_completed, ThreadPoolExecutor from functools import partial @@ -67,6 +68,9 @@ def cache_dir(self, value): def supported(cls, config): return True + def checksum_to_path(self, checksum): + return posixpath.join(self._path_str, checksum[0:2], checksum[2:]) + def list_cache_paths(self, prefix=None, progress_callback=None): assert self.path_info is not None if prefix: @@ -148,11 +152,15 @@ def get_file_checksum(self, path_info): return file_md5(path_info)[0] def remove(self, path_info): - if path_info.scheme != "local": - raise NotImplementedError + if isinstance(path_info, PathInfo): + if path_info.scheme != "local": + raise NotImplementedError + path = path_info.fspath + else: + path = path_info - if self.exists(path_info): - remove(path_info.fspath) + if self.exists(path): + remove(path) def move(self, from_info, to_info, mode=None): if from_info.scheme != "local" or to_info.scheme != "local": diff --git a/dvc/state.py b/dvc/state.py index 3cdfd09fc2..22881daff8 100644 --- a/dvc/state.py +++ b/dvc/state.py @@ -367,7 +367,7 @@ def save(self, path_info, checksum): path_info (dict): path_info to save checksum for. checksum (str): checksum to save. """ - assert path_info.scheme == "local" + assert isinstance(path_info, str) or path_info.scheme == "local" assert checksum is not None assert os.path.exists(fspath_py35(path_info)) @@ -398,7 +398,7 @@ def get(self, path_info): str or None: checksum for the specified path info or None if it doesn't exist in the state database. """ - assert path_info.scheme == "local" + assert isinstance(path_info, str) or path_info.scheme == "local" path = fspath_py35(path_info) if not os.path.exists(path): @@ -425,7 +425,7 @@ def save_link(self, path_info): Args: path_info (dict): path info to add to the list of links. """ - assert path_info.scheme == "local" + assert isinstance(path_info, str) or path_info.scheme == "local" if not os.path.exists(fspath_py35(path_info)): return From eb40f8d9bbf3bc0b4ce29bd9eb8931eb5302b817 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 23 Apr 2020 19:55:57 +0900 Subject: [PATCH 4/9] only optimize calls made from RemoteLOCAL.cache_exists --- dvc/remote/base.py | 15 +++++---------- dvc/remote/local.py | 12 ++++++++---- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 82b19dc66c..1f52e59255 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -11,8 +11,6 @@ from multiprocessing import cpu_count from operator import itemgetter -from funcy import cached_property - from shortuuid import uuid import dvc.prompt as prompt @@ -721,12 +719,6 @@ def path_to_checksum(self, path): def checksum_to_path_info(self, checksum): return self.path_info / checksum[0:2] / checksum[2:] - checksum_to_path = checksum_to_path_info - - @cached_property - def _path_str(self): - return str(self.path_info) - def list_cache_paths(self, prefix=None, progress_callback=None): raise NotImplementedError @@ -792,7 +784,7 @@ def gc(self, named_cache, jobs=None): def is_protected(self, path_info): return False - def changed_cache_file(self, checksum): + def changed_cache_file(self, checksum, path_info=None): """Compare the given checksum with the (corresponding) actual one. - Use `State` as a cache for computed checksums @@ -804,7 +796,10 @@ def changed_cache_file(self, checksum): - Remove the file from cache if it doesn't match the actual checksum """ - cache_info = self.checksum_to_path(checksum) + if path_info: + cache_info = path_info + else: + cache_info = self.checksum_to_path_info(checksum) if self.is_protected(cache_info): logger.debug( "Assuming '%s' is unchanged since it is read-only", cache_info diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 7c7a574aaf..ae788abad9 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -68,9 +68,6 @@ def cache_dir(self, value): def supported(cls, config): return True - def checksum_to_path(self, checksum): - return posixpath.join(self._path_str, checksum[0:2], checksum[2:]) - def list_cache_paths(self, prefix=None, progress_callback=None): assert self.path_info is not None if prefix: @@ -233,6 +230,11 @@ def reflink(self, from_info, to_info): os.rename(fspath_py35(tmp_info), fspath_py35(to_info)) def cache_exists(self, checksums, jobs=None, name=None): + base_path = str(self.path_info) + + def checksum_to_path(checksum): + return posixpath.join(base_path, checksum[0:2], checksum[2:]) + return [ checksum for checksum in Tqdm( @@ -241,7 +243,9 @@ def cache_exists(self, checksums, jobs=None, name=None): desc="Querying " + ("cache in " + name if name else "local cache"), ) - if not self.changed_cache_file(checksum) + if not self.changed_cache_file( + checksum, path_info=checksum_to_path(checksum) + ) ] def _upload( From 8487213b886da56db312f8f03ac6ae243580fd83 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 24 Apr 2020 11:44:59 +0900 Subject: [PATCH 5/9] fix ds warning --- dvc/remote/local.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index ae788abad9..1fc2d8592f 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -336,11 +336,11 @@ def _status( ) ) return self._make_status( - named_cache, remote, show_checksums, local_exists, remote_exists + named_cache, show_checksums, local_exists, remote_exists ) def _make_status( - self, named_cache, remote, show_checksums, local_exists, remote_exists + self, named_cache, show_checksums, local_exists, remote_exists ): def make_names(checksum, names): return {"name": checksum if show_checksums else " ".join(names)} From c4863b12e886b53ba79c051a715e823cf3ab0d19 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 24 Apr 2020 13:46:04 +0900 Subject: [PATCH 6/9] use os.path in local --- dvc/remote/local.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 1fc2d8592f..637c7e58e8 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -1,7 +1,6 @@ import errno import logging import os -import posixpath import stat from concurrent.futures import as_completed, ThreadPoolExecutor from functools import partial @@ -233,7 +232,7 @@ def cache_exists(self, checksums, jobs=None, name=None): base_path = str(self.path_info) def checksum_to_path(checksum): - return posixpath.join(base_path, checksum[0:2], checksum[2:]) + return os.path.join(base_path, checksum[0:2], checksum[2:]) return [ checksum From c5cefd16083045128167b3dc4109c076ff6d2633 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 27 Apr 2020 13:56:00 +0900 Subject: [PATCH 7/9] remote: re-add checksum_to_path() to return string paths when applicable - cloud remotes still default to using PathInfo's --- dvc/remote/base.py | 12 +++++++----- dvc/remote/local.py | 12 ++++-------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 1f52e59255..7414397370 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -719,6 +719,10 @@ def path_to_checksum(self, path): def checksum_to_path_info(self, checksum): return self.path_info / checksum[0:2] / checksum[2:] + # Return path as a string instead of PathInfo for remotes which support + # string paths (see local) + checksum_to_path = checksum_to_path_info + def list_cache_paths(self, prefix=None, progress_callback=None): raise NotImplementedError @@ -784,7 +788,7 @@ def gc(self, named_cache, jobs=None): def is_protected(self, path_info): return False - def changed_cache_file(self, checksum, path_info=None): + def changed_cache_file(self, checksum): """Compare the given checksum with the (corresponding) actual one. - Use `State` as a cache for computed checksums @@ -796,10 +800,8 @@ def changed_cache_file(self, checksum, path_info=None): - Remove the file from cache if it doesn't match the actual checksum """ - if path_info: - cache_info = path_info - else: - cache_info = self.checksum_to_path_info(checksum) + # Prefer string path over PathInfo when possible due to performance + cache_info = self.checksum_to_path(checksum) if self.is_protected(cache_info): logger.debug( "Assuming '%s' is unchanged since it is read-only", cache_info diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 637c7e58e8..3ec98c119a 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -67,6 +67,9 @@ def cache_dir(self, value): def supported(cls, config): return True + def checksum_to_path(self, checksum): + return os.path.join(self.path_info.fspath, checksum[0:2], checksum[2:]) + def list_cache_paths(self, prefix=None, progress_callback=None): assert self.path_info is not None if prefix: @@ -229,11 +232,6 @@ def reflink(self, from_info, to_info): os.rename(fspath_py35(tmp_info), fspath_py35(to_info)) def cache_exists(self, checksums, jobs=None, name=None): - base_path = str(self.path_info) - - def checksum_to_path(checksum): - return os.path.join(base_path, checksum[0:2], checksum[2:]) - return [ checksum for checksum in Tqdm( @@ -242,9 +240,7 @@ def checksum_to_path(checksum): desc="Querying " + ("cache in " + name if name else "local cache"), ) - if not self.changed_cache_file( - checksum, path_info=checksum_to_path(checksum) - ) + if not self.changed_cache_file(checksum) ] def _upload( From 7b752edaa5b792656ee69e69ce42a675bb025ecd Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 27 Apr 2020 14:24:28 +0900 Subject: [PATCH 8/9] cache fspath string --- dvc/remote/local.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 3ec98c119a..28e28d121d 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -5,7 +5,7 @@ from concurrent.futures import as_completed, ThreadPoolExecutor from functools import partial -from funcy import concat +from funcy import cached_property, concat from shortuuid import uuid @@ -67,8 +67,12 @@ def cache_dir(self, value): def supported(cls, config): return True + @cached_property + def cache_path(self): + return str(self.cache_dir) + def checksum_to_path(self, checksum): - return os.path.join(self.path_info.fspath, checksum[0:2], checksum[2:]) + return os.path.join(self.cache_path, checksum[0:2], checksum[2:]) def list_cache_paths(self, prefix=None, progress_callback=None): assert self.path_info is not None From 33ec6d7854697cd487e97f3226ba33a51ee1766a Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 27 Apr 2020 15:09:11 +0900 Subject: [PATCH 9/9] use abspath in checksum_to_path - if path is not relpath from cwd or abspath, posix lstat() syscall runtime doubles (from calculating relpath from cwd) --- dvc/remote/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 28e28d121d..87f8642290 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -69,7 +69,7 @@ def supported(cls, config): @cached_property def cache_path(self): - return str(self.cache_dir) + return os.path.abspath(self.cache_dir) def checksum_to_path(self, checksum): return os.path.join(self.cache_path, checksum[0:2], checksum[2:])