Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dvc/command/data_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def run(self):
with_deps=self.args.with_deps,
force=self.args.force,
recursive=self.args.recursive,
drop_index=self.args.drop_index,
)
log_summary(stats)
except (CheckoutError, DvcException) as exc:
Expand All @@ -53,6 +54,7 @@ def run(self):
all_commits=self.args.all_commits,
with_deps=self.args.with_deps,
recursive=self.args.recursive,
drop_index=self.args.drop_index,
)
except DvcException:
logger.exception("failed to push data to the cloud")
Expand Down Expand Up @@ -158,6 +160,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Pull cache for subdirectories of the specified directory.",
)
pull_parser.add_argument(
"--drop-index",
action="store_true",
default=False,
help="Drop local index for the specified remote.",
Comment on lines +163 to +167
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about "specified remote" since sync commands don't always receive a -r arg. Maybe just remove the word "specified".

Also, "drop index" may not mean anything to a casual user, should this be a little more descriptive?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in the other 2 sync commands.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about --clear-index or --reset-index?

)
pull_parser.set_defaults(func=CmdDataPull)

# Push
Expand Down Expand Up @@ -207,6 +215,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Push cache for subdirectories of specified directory.",
)
push_parser.add_argument(
"--drop-index",
action="store_true",
default=False,
help="Drop local index for the remote.",
)
push_parser.set_defaults(func=CmdDataPush)

# Fetch
Expand Down Expand Up @@ -324,4 +338,10 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Show status for all dependencies of the specified target.",
)
status_parser.add_argument(
"--drop-index",
action="store_true",
default=False,
help="Drop local index for the remote.",
)
status_parser.set_defaults(func=CmdDataStatus)
1 change: 1 addition & 0 deletions dvc/command/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def run(self):
all_tags=self.args.all_tags,
all_commits=self.args.all_commits,
with_deps=self.args.with_deps,
drop_index=self.args.drop_index,
)
if st:
if self.args.quiet:
Expand Down
79 changes: 61 additions & 18 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,67 +48,110 @@ def get_remote(self, remote=None, command="<command>"):
def _init_remote(self, remote):
return Remote(self.repo, name=remote)

def push(self, cache, jobs=None, remote=None, show_checksums=False):
def push(
self,
caches,
jobs=None,
remote=None,
show_checksums=False,
drop_index=False,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add drop_index to docstring Args?
Same in other functions/methods.

):
"""Push data items in a cloud-agnostic way.

Args:
cache (NamedCache): named checksums to push to the cloud.
caches (list): list of (dir_cache, file_cache) tuples containing
named checksums to push to the cloud.
jobs (int): number of jobs that can be running simultaneously.
remote (dvc.remote.base.RemoteBASE): optional remote to push to.
By default remote from core.remote config option is used.
show_checksums (bool): show checksums instead of file names in
information messages.
drop_index (bool): clear local index for the remote
"""
return self.repo.cache.local.push(
cache,
caches,
jobs=jobs,
remote=self.get_remote(remote, "push"),
show_checksums=show_checksums,
drop_index=drop_index,
)

def pull(self, cache, jobs=None, remote=None, show_checksums=False):
def pull(
self,
caches,
jobs=None,
remote=None,
show_checksums=False,
drop_index=False,
):
"""Pull data items in a cloud-agnostic way.

Args:
cache (NamedCache): named checksums to pull from the cloud.
caches (list): list of (dir_cache, file_cache) tuples containing
named checksums to pull from the cloud.
jobs (int): number of jobs that can be running simultaneously.
remote (dvc.remote.base.RemoteBASE): optional remote to pull from.
By default remote from core.remote config option is used.
show_checksums (bool): show checksums instead of file names in
information messages.
drop_index (bool): clear local index for the remote
"""
remote = self.get_remote(remote, "pull")
downloaded_items_num = self.repo.cache.local.pull(
cache, jobs=jobs, remote=remote, show_checksums=show_checksums
caches,
jobs=jobs,
remote=remote,
show_checksums=show_checksums,
drop_index=drop_index,
)

if not remote.verify:
self._save_pulled_checksums(cache)
self._save_pulled_checksums(caches)

return downloaded_items_num

def _save_pulled_checksums(self, cache):
for checksum in cache["local"].keys():
cache_file = self.repo.cache.local.checksum_to_path_info(checksum)
if self.repo.cache.local.exists(cache_file):
# We can safely save here, as existing corrupted files will be
# removed upon status, while files corrupted during download
# will not be moved from tmp_file (see `RemoteBASE.download()`)
self.repo.state.save(cache_file, checksum)

def status(self, cache, jobs=None, remote=None, show_checksums=False):
for dir_cache, file_cache in cache:
checksums = set(file_cache["local"].keys())
if dir_cache is not None:
checksums.update(dir_cache["local"].keys())
for checksum in checksums:
cache_file = self.repo.cache.local.checksum_to_path_info(
checksum
)
if self.repo.cache.local.exists(cache_file):
# We can safely save here, as existing corrupted files will
# be removed upon status, while files corrupted during
# download will not be moved from tmp_file
# (see `RemoteBASE.download()`)
self.repo.state.save(cache_file, checksum)
Comment on lines +123 to +127
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment getting long πŸ˜‹


def status(
self,
caches,
jobs=None,
remote=None,
show_checksums=False,
drop_index=False,
):
"""Check status of data items in a cloud-agnostic way.

Args:
cache (NamedCache): named checksums to check status for.
caches (list): list of (dir_cache, file_cache) tuples containg
named checksums to check status for.
jobs (int): number of jobs that can be running simultaneously.
remote (dvc.remote.base.RemoteBASE): optional remote to compare
cache to. By default remote from core.remote config option
is used.
show_checksums (bool): show checksums instead of file names in
information messages.
drop_index (bool): clear local index for the remote
"""
remote = self.get_remote(remote, "status")
return self.repo.cache.local.status(
cache, jobs=jobs, remote=remote, show_checksums=show_checksums
caches,
jobs=jobs,
remote=remote,
show_checksums=show_checksums,
drop_index=drop_index,
)
2 changes: 1 addition & 1 deletion dvc/external_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _pull_cached(self, out, path_info, dest):

# Only pull unless all needed cache is present
if out.changed_cache(filter_info=src):
self.cloud.pull(out.get_used_cache(filter_info=src))
self.cloud.pull([out.get_used_cache(filter_info=src)])

try:
out.checkout(filter_info=src)
Expand Down
21 changes: 13 additions & 8 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,9 @@ def _collect_used_dir_cache(

if self.cache.changed_cache_file(self.checksum):
try:
cache = NamedCache.make("local", self.checksum, str(self))
self.repo.cloud.pull(
NamedCache.make("local", self.checksum, str(self)),
[(None, cache)],
jobs=jobs,
remote=remote,
show_checksums=False,
Expand Down Expand Up @@ -401,16 +402,22 @@ def get_used_cache(self, **kwargs):

In case that the given output is a directory, it will also
include the `info` of its files.

Returns:
2-tuple of NamedCache objects in the form of
(directory `info`, file `info`).
If the given output is not a directory, the first tuple entry will
be None.
"""

if not self.use_cache:
return NamedCache()
return None, NamedCache()

if self.stage.is_repo_import:
cache = NamedCache()
(dep,) = self.stage.deps
cache.external[dep.repo_pair].add(dep.def_path)
return cache
return None, cache

if not self.checksum:
msg = (
Expand All @@ -429,16 +436,14 @@ def get_used_cache(self, **kwargs):
)
)
logger.warning(msg)
return NamedCache()
return None, NamedCache()

ret = NamedCache.make(self.scheme, self.checksum, str(self))

if not self.is_dir_checksum:
return ret
return None, ret

ret.update(self._collect_used_dir_cache(**kwargs))

return ret
return ret, self._collect_used_dir_cache(**kwargs)

@classmethod
def _validate_output_path(cls, path):
Expand Down
Loading