Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dvc/external_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ def download_update(result):
raise PathMissingError(path, self.url)
save_info = self.local_cache.save(
path,
self.repo_tree,
None,
tree=self.repo_tree,
save_link=False,
download_callback=download_update,
)
save_infos.append(save_info)
Expand Down
2 changes: 1 addition & 1 deletion dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def save(self):

def commit(self):
if self.use_cache:
self.cache.save(self.path_info, self.info)
self.cache.save(self.path_info, self.cache.tree, self.info)

def dumpd(self):
ret = copy(self.info)
Expand Down
6 changes: 3 additions & 3 deletions dvc/remote/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def remove(self, path_info):
logger.debug(f"Removing {path_info}")
self.blob_service.delete_blob(path_info.bucket, path_info.path)

def get_file_checksum(self, path_info):
return self.get_etag(path_info)

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
Expand Down Expand Up @@ -138,6 +141,3 @@ class AzureRemote(BaseRemote):
COPY_POLL_SECONDS = 5
LIST_OBJECT_PAGE_SIZE = 5000
TREE_CLS = AzureRemoteTree

def get_file_checksum(self, path_info):
return self.tree.get_etag(path_info)
181 changes: 86 additions & 95 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ def hardlink(self, from_info, to_info):
def reflink(self, from_info, to_info):
raise RemoteActionNotImplemented("reflink", self.scheme)

def get_file_checksum(self, path_info):
raise NotImplementedError

@staticmethod
def _handle_transfer_exception(from_info, to_info, exception, operation):
if isinstance(exception, OSError) and exception.errno == errno.EMFILE:
Expand Down Expand Up @@ -397,51 +400,44 @@ def supported(cls, config):
def cache(self):
return getattr(self.repo.cache, self.scheme)

def get_file_checksum(self, path_info):
raise NotImplementedError

def _calculate_checksums(self, file_infos):
def _calculate_checksums(self, file_infos, tree):
file_infos = list(file_infos)
with Tqdm(
total=len(file_infos),
unit="md5",
desc="Computing file/dir hashes (only done once)",
) as pbar:
worker = pbar.wrap_fn(self.get_file_checksum)
worker = pbar.wrap_fn(tree.get_file_checksum)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we still don't have a cache/remote separation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efiop Not yet. I'm still thinking about the best way to go about that particular separation.

The "remote" functions like push/pull/status (which are currently in LocalRemote) are really functions for syncing the contents of two caches. So I've been wondering if they should just be abstracted into functions for syncing from "this/self" cache to/from any other cache, and then moved into BaseRemote?

And then the remote classes could still be renamed to Cache or RemoteCache (or something else along those lines), but there wouldn't be any further separation into new classes (other than the tree methods).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that we would still have the external output/dependency use case to consider, but it seems like in practice, that is still just a situation where we would have two "s3 caches"

  • one which is used as the typical "dvc remote" for syncing local cache <-> s3
  • and the second which is used as the s3 external out cache

with ThreadPoolExecutor(
max_workers=self.checksum_jobs
) as executor:
tasks = executor.map(worker, file_infos)
checksums = dict(zip(file_infos, tasks))
return checksums

def _collect_dir(self, path_info, tree=None, save_tree=False, **kwargs):
def _collect_dir(self, path_info, tree, save_tree=False, **kwargs):
file_infos = set()

if tree:
walk_files = tree.walk_files
else:
walk_files = self.tree.walk_files

for fname in walk_files(path_info, **kwargs):
for fname in tree.walk_files(path_info, **kwargs):
if DvcIgnore.DVCIGNORE_FILE == fname.name:
raise DvcIgnoreInCollectedDirError(fname.parent)

file_infos.add(fname)

if tree:
checksums = {fi: tree.get_file_checksum(fi) for fi in file_infos}
if save_tree:
for fi, checksum in checksums.items():
self._save_file(fi, checksum, tree=tree, **kwargs)
else:
checksums = {fi: self.state.get(fi) for fi in file_infos}
not_in_state = {
fi for fi, checksum in checksums.items() if checksum is None
}
checksums = {fi: self.state.get(fi) for fi in file_infos}
not_in_state = {
fi for fi, checksum in checksums.items() if checksum is None
}

new_checksums = self._calculate_checksums(not_in_state)
checksums.update(new_checksums)
new_checksums = self._calculate_checksums(not_in_state, tree)
checksums.update(new_checksums)

if save_tree:
logger.debug("_collect_dir save_tree")
logger.debug(f"{kwargs}")
for fi, checksum in checksums.items():
logger.debug(f"_collect_dir saving '{fi}' '{checksum}'")
self._save_file(fi, tree, checksum, **kwargs)

result = [
{
Expand All @@ -463,12 +459,16 @@ def _collect_dir(self, path_info, tree=None, save_tree=False, **kwargs):
return sorted(result, key=itemgetter(self.PARAM_RELPATH))

def get_dir_checksum(self, path_info, tree=None):
if not tree:
tree = self.tree

if not self.cache:
raise RemoteCacheRequiredError(path_info)

dir_info = self._collect_dir(path_info, tree=None)
if tree:
# don't save state entry for path_info if it is a tree path
dir_info = self._collect_dir(path_info, tree)
if tree != self.tree:
# don't save state entry for path_info if it is from a different
# tree
path_info = None
return self._save_dir_info(dir_info, path_info)

Expand All @@ -490,11 +490,12 @@ def _get_dir_info_checksum(self, dir_info):
with open(tmp, "w+") as fobj:
json.dump(dir_info, fobj, sort_keys=True)

tree = self.cache.tree
from_info = PathInfo(tmp)
to_info = self.cache.path_info / tmp_fname("")
self.cache.tree.upload(from_info, to_info, no_progress_bar=True)
to_info = tree.path_info / tmp_fname("")
tree.upload(from_info, to_info, no_progress_bar=True)

checksum = self.get_file_checksum(to_info) + self.CHECKSUM_DIR_SUFFIX
checksum = tree.get_file_checksum(to_info) + self.CHECKSUM_DIR_SUFFIX
return checksum, to_info

def get_dir_cache(self, checksum):
Expand Down Expand Up @@ -544,10 +545,13 @@ def is_dir_checksum(cls, checksum):
return False
return checksum.endswith(cls.CHECKSUM_DIR_SUFFIX)

def get_checksum(self, path_info):
def get_checksum(self, path_info, tree=None):
assert isinstance(path_info, str) or path_info.scheme == self.scheme

if not self.tree.exists(path_info):
if not tree:
tree = self.tree

if not tree.exists(path_info):
return None

checksum = self.state.get(path_info)
Expand All @@ -558,27 +562,25 @@ def get_checksum(self, path_info):
if (
checksum
and self.is_dir_checksum(checksum)
and not self.tree.exists(
self.cache.checksum_to_path_info(checksum)
)
and not tree.exists(self.cache.checksum_to_path_info(checksum))
):
checksum = None

if checksum:
return checksum

if self.tree.isdir(path_info):
checksum = self.get_dir_checksum(path_info)
if tree.isdir(path_info):
checksum = self.get_dir_checksum(path_info, tree)
else:
checksum = self.get_file_checksum(path_info)
checksum = tree.get_file_checksum(path_info)

if checksum:
if checksum and self.tree.exists(path_info):
self.state.save(path_info, checksum)

return checksum

def save_info(self, path_info):
return {self.PARAM_CHECKSUM: self.get_checksum(path_info)}
def save_info(self, path_info, tree=None):
return {self.PARAM_CHECKSUM: self.get_checksum(path_info, tree=tree)}

def changed(self, path_info, checksum_info):
"""Checks if data has changed.
Expand Down Expand Up @@ -677,25 +679,11 @@ def _do_link(self, from_info, to_info, link_method):
"Created '%s': %s -> %s", self.cache_types[0], from_info, to_info,
)

def _save_file(
self, path_info, checksum, save_link=True, tree=None, **kwargs
):
def _save_file(self, path_info, tree, checksum, save_link=True, **kwargs):
assert checksum

cache_info = self.checksum_to_path_info(checksum)
if tree:
if self.changed_cache(checksum):
with tree.open(path_info, mode="rb") as fobj:
# if tree has fetch enabled, DVC out will be fetched on
# open and we do not need to read/copy any data
if not (
tree.isdvc(path_info, strict=False) and tree.fetch
):
self.tree.copy_fobj(fobj, cache_info)
callback = kwargs.get("download_callback")
if callback:
callback(1)
else:
if tree == self.tree:
if self.changed_cache(checksum):
self.tree.move(path_info, cache_info, mode=self.CACHE_MODE)
self.link(cache_info, path_info)
Expand All @@ -710,12 +698,23 @@ def _save_file(

if save_link:
self.state.save_link(path_info)

# we need to update path and cache, since in case of reflink,
# or copy cache type moving original file results in updates on
# next executed command, which causes md5 recalculation
if not tree or is_working_tree(tree):
# we need to update path and cache, since in case of reflink,
# or copy cache type moving original file results in updates on
# next executed command, which causes md5 recalculation
self.state.save(path_info, checksum)
else:
if self.changed_cache(checksum):
with tree.open(path_info, mode="rb") as fobj:
# if tree has fetch enabled, DVC out will be fetched on
# open and we do not need to read/copy any data
if not (
tree.isdvc(path_info, strict=False) and tree.fetch
):
self.tree.copy_fobj(fobj, cache_info)
callback = kwargs.get("download_callback")
if callback:
callback(1)

self.state.save(cache_info, checksum)
return {self.PARAM_CHECKSUM: checksum}

Expand All @@ -741,26 +740,27 @@ def _cache_is_copy(self, path_info):
self.cache_type_confirmed = True
return self.cache_types[0] == "copy"

def _save_dir(
self, path_info, checksum, save_link=True, tree=None, **kwargs
):
if tree:
dir_info = self._collect_dir(
path_info, tree=tree, save_tree=True, **kwargs
)
checksum = self._save_dir_info(dir_info)
else:
def _save_dir(self, path_info, tree, checksum, save_link=True, **kwargs):
if checksum:
dir_info = self.get_dir_cache(checksum)

for entry in Tqdm(
dir_info, desc="Saving " + path_info.name, unit="file"
):
entry_info = path_info / entry[self.PARAM_RELPATH]
entry_checksum = entry[self.PARAM_CHECKSUM]
self._save_file(entry_info, entry_checksum, save_link=False)
self._save_file(
entry_info, tree, entry_checksum, save_link=False
)

if save_link:
self.state.save_link(path_info)
self.state.save(path_info, checksum)
else:
dir_info = self._collect_dir(
path_info, tree, save_tree=True, **kwargs
)
checksum = self._save_dir_info(dir_info)

cache_info = self.checksum_to_path_info(checksum)
self.state.save(cache_info, checksum)
Expand All @@ -772,42 +772,33 @@ def _save_dir(
def protect(path_info):
pass

def save(
self, path_info, checksum_info, save_link=True, tree=None, **kwargs
):
def save(self, path_info, tree, checksum_info, save_link=True, **kwargs):
if path_info.scheme != self.scheme:
raise RemoteActionNotImplemented(
f"save {path_info.scheme} -> {self.scheme}", self.scheme,
)

if tree:
if tree.isdir(path_info):
# save checksum will be computed during tree walk
checksum = None
else:
checksum = tree.get_file_checksum(path_info)
else:
if checksum_info:
checksum = checksum_info[self.PARAM_CHECKSUM]
return self._save(path_info, checksum, save_link, tree, **kwargs)

def _save(self, path_info, checksum, save_link=True, tree=None, **kwargs):
if tree:
logger.debug("Saving tree path '%s' to cache.", path_info)
elif tree.isdir(path_info):
# for dirs, save checksum will be computed during tree walk
checksum = None
else:
checksum = tree.get_file_checksum(path_info)
return self._save(path_info, tree, checksum, save_link, **kwargs)

def _save(self, path_info, tree, checksum, save_link=True, **kwargs):
if checksum:
to_info = self.checksum_to_path_info(checksum)
logger.debug("Saving '%s' to '%s'.", path_info, to_info)

if tree:
isdir = tree.isdir
save_link = False
else:
isdir = self.tree.isdir
logger.debug("Saving tree path '%s' to cache.", path_info)

if isdir(path_info):
if tree.isdir(path_info):
return self._save_dir(
path_info, checksum, save_link, tree, **kwargs
path_info, tree, checksum, save_link, **kwargs
)
return self._save_file(path_info, checksum, save_link, tree, **kwargs)
return self._save_file(path_info, tree, checksum, save_link, **kwargs)

def open(self, *args, **kwargs):
return self.tree.open(*args, **kwargs)
Expand Down
6 changes: 3 additions & 3 deletions dvc/remote/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,9 @@ def remove(self, path_info):
item_id = self._get_item_id(path_info)
self.gdrive_delete_file(item_id)

def get_file_checksum(self, path_info):
raise NotImplementedError

def _upload(self, from_file, to_info, name=None, no_progress_bar=False):
dirname = to_info.parent
assert dirname
Expand All @@ -567,6 +570,3 @@ class GDriveRemote(BaseRemote):
TRAVERSE_WEIGHT_MULTIPLIER = 1
TRAVERSE_PREFIX_LEN = 2
TREE_CLS = GDriveRemoteTree

def get_file_checksum(self, path_info):
raise NotImplementedError
Loading