From e13f912d0e2856189aa80131c112c4fa0ea2df1f Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Sun, 9 Feb 2020 15:53:34 -0800 Subject: [PATCH 1/6] cleanup gdrive code, fix remote ids resolving --- dvc/remote/gdrive.py | 161 +++++++++++++++++++++++++------------------ 1 file changed, 93 insertions(+), 68 deletions(-) diff --git a/dvc/remote/gdrive.py b/dvc/remote/gdrive.py index 9cbca729e2..e7e0938c2d 100644 --- a/dvc/remote/gdrive.py +++ b/dvc/remote/gdrive.py @@ -1,8 +1,9 @@ import os import posixpath import logging -import threading import re +import threading +from urllib.parse import urlparse from funcy import retry, compose, decorator, wrap_with from funcy.py3 import cat @@ -23,12 +24,22 @@ class GDriveRetriableError(DvcException): pass +class GDrivePathNotFound(DvcException): + def __init__(self, path_info): + super().__init__("Google Drive path '{}' not found.".format(path_info)) + + class GDriveAccessTokenRefreshError(DvcException): - pass + def __init__(self): + super().__init__("Google Drive access token refreshment is failed.") class GDriveMissedCredentialKeyError(DvcException): - pass + def __init__(self, path): + super().__init__( + "Google Drive user credentials file '{}' " + "misses value for key.".format(path) + ) @decorator @@ -49,17 +60,32 @@ def _wrap_pydrive_retriable(call): gdrive_retry = compose( - # 8 tries, start at 0.5s, multiply by golden ratio, cap at 10s + # 15 tries, start at 0.5s, multiply by golden ratio, cap at 20s retry( - 8, GDriveRetriableError, timeout=lambda a: min(0.5 * 1.618 ** a, 10) + 15, GDriveRetriableError, timeout=lambda a: min(0.5 * 1.618 ** a, 20) ), _wrap_pydrive_retriable, ) +class GDriveURLInfo(CloudURLInfo): + def __init__(self, url): + super().__init__(url) + + # GDrive URL host part is case sensitive, + # we are restoring it here. + assert self.netloc == self.host + p = urlparse(url) + self.host = p.netloc + + # Normalize path. Important since we have a cache + # path to ID and don't want to deal with it everywhere in code + self._spath = re.sub("/{2,}", "/", self._spath.rstrip("/")) + + class RemoteGDrive(RemoteBASE): scheme = Schemes.GDRIVE - path_cls = CloudURLInfo + path_cls = GDriveURLInfo REQUIRES = {"pydrive2": "pydrive2"} DEFAULT_NO_TRAVERSE = False DEFAULT_VERIFY = True @@ -69,21 +95,19 @@ class RemoteGDrive(RemoteBASE): def __init__(self, repo, config): super().__init__(repo, config) - self.path_info = self.path_cls(config[Config.SECTION_REMOTE_URL]) + url = config[Config.SECTION_REMOTE_URL] + self.path_info = self.path_cls(url) - bucket = re.search( - "{}://(.*)".format(self.scheme), - config[Config.SECTION_REMOTE_URL], - re.IGNORECASE, - ) - self.bucket = ( - bucket.group(1).split("/")[0] if bucket else self.path_info.bucket - ) + if not self.path_info.bucket: + raise DvcException( + "Empty Google Drive URL '{}'. Learn more at " + "{}.".format( + url, format_link("https://man.dvc.org/remote/add") + ) + ) + self.bucket = self.path_info.bucket self.config = config - self.init_drive() - - def init_drive(self): self.client_id = self.config.get(Config.SECTION_GDRIVE_CLIENT_ID, None) self.client_secret = self.config.get( Config.SECTION_GDRIVE_CLIENT_SECRET, None @@ -91,7 +115,7 @@ def init_drive(self): if not self.client_id or not self.client_secret: raise DvcException( "Please specify Google Drive's client id and " - "secret in DVC's config. Learn more at " + "secret in DVC config. Learn more at " "{}.".format(format_link("https://man.dvc.org/remote/add")) ) self.gdrive_user_credentials_path = ( @@ -139,7 +163,7 @@ def gdrive_download_file( # it does not create a file on the remote gdrive_file = self.drive.CreateFile(param) bar_format = ( - "Donwloading {desc:{ncols_desc}.{ncols_desc}}... " + "Downloading {desc:{ncols_desc}.{ncols_desc}}... " + Tqdm.format_sizeof(int(gdrive_file["fileSize"]), "B", 1024) ) with Tqdm( @@ -165,11 +189,12 @@ def cache_root_dirs(self): cached_dirs = {} cached_ids = {} for dir1 in self.gdrive_list_item( - "'{}' in parents and trashed=false".format(self.remote_root_id) + "'{}' in parents and trashed=false".format(self._remote_root_id) ): remote_path = posixpath.join(self.path_info.path, dir1["title"]) cached_dirs.setdefault(remote_path, []).append(dir1["id"]) cached_ids[dir1["id"]] = dir1["title"] + return cached_dirs, cached_ids @property @@ -224,7 +249,6 @@ def drive(self): GoogleAuth.DEFAULT_SETTINGS["get_refresh_token"] = True GoogleAuth.DEFAULT_SETTINGS["oauth_scope"] = [ "https://www.googleapis.com/auth/drive", - # drive.appdata grants access to appDataFolder GDrive directory "https://www.googleapis.com/auth/drive.appdata", ] @@ -234,17 +258,12 @@ def drive(self): try: gauth.CommandLineAuth() except RefreshError as exc: - raise GDriveAccessTokenRefreshError( - "Google Drive's access token refreshment is failed" - ) from exc + raise GDriveAccessTokenRefreshError from exc except KeyError as exc: raise GDriveMissedCredentialKeyError( - "Google Drive's user credentials file '{}' " - "misses value for key '{}'".format( - self.gdrive_user_credentials_path, str(exc) - ) - ) - # Handle pydrive2.auth.AuthenticationError and others auth failures + self.gdrive_user_credentials_path + ) from exc + # Handle pydrive2.auth.AuthenticationError and other auth failures except Exception as exc: raise DvcException( "Google Drive authentication failed" @@ -256,11 +275,9 @@ def drive(self): self._gdrive = GoogleDrive(gauth) if self.bucket != "root" and self.bucket != "appDataFolder": - self.remote_drive_id = self.get_remote_drive_id(self.bucket) + self.remote_drive_id = self._get_remote_drive_id(self.bucket) self._corpora = "drive" if self.remote_drive_id else "default" - self.remote_root_id = self.get_remote_id( - self.path_info, create=True - ) + self._remote_root_id = self._get_remote_id(self.path_info) self._cached_dirs, self._cached_ids = self.cache_root_dirs() @@ -303,57 +320,65 @@ def get_remote_item(self, name, parents_ids): return next(iter(item_list), None) @gdrive_retry - def get_remote_drive_id(self, remote_id): + def _get_remote_drive_id(self, remote_id): param = {"id": remote_id} # it does not create a file on the remote item = self.drive.CreateFile(param) item.FetchMetadata("driveId") return item.get("driveId", None) - def resolve_remote_item_from_path(self, path_parts, create): - parents_ids = [self.bucket] - current_path = "" - for path_part in path_parts: - current_path = posixpath.join(current_path, path_part) - remote_ids = self.get_remote_id_from_cache(current_path) - if remote_ids: - parents_ids = remote_ids - continue - item = self.get_remote_item(path_part, parents_ids) - if not item and create: - item = self.create_remote_dir(parents_ids[0], path_part) - elif not item: - return None - parents_ids = [item["id"]] - return item - - def get_remote_id_from_cache(self, remote_path): + def _get_remote_ids_from_cache(self, remote_path): if hasattr(self, "_cached_dirs"): return self.cached_dirs.get(remote_path, []) return [] - def get_remote_id(self, path_info, create=False): - if not path_info.path and path_info.bucket: - # Case sensitive base path - return self.bucket - - remote_ids = self.get_remote_id_from_cache(path_info.path) + def _path_to_remote_ids(self, path, create): + if not path: + return [self.bucket] + if path == self.path_info.path and self._remote_root_id: + return [self._remote_root_id] + remote_ids = self._get_remote_ids_from_cache(path) if remote_ids: - return remote_ids[0] + return remote_ids - file1 = self.resolve_remote_item_from_path( - path_info.path.split("/"), create - ) - return file1["id"] if file1 else "" + if "/" in path: + parent_path, path_part = path.rsplit("/", 1) + else: + parent_path, path_part = ["", path] + + parent_ids = self._path_to_remote_ids(parent_path, create) + item = self.get_remote_item(path_part, parent_ids) + + if not item: + if create: + item = self.create_remote_dir(parent_ids[0], path_part) + else: + return None + + return [item["id"]] + + def _get_remote_id(self, path_info, create=False): + assert path_info.bucket == self.bucket + + remote_ids = self._path_to_remote_ids(path_info.path, create) + if not remote_ids: + raise GDrivePathNotFound(path_info) + + return remote_ids[0] def exists(self, path_info): - return self.get_remote_id(path_info) != "" + try: + self._get_remote_id(path_info) + except GDrivePathNotFound: + return False + else: + return True def _upload(self, from_file, to_info, name, no_progress_bar): dirname = to_info.parent if dirname: - parent_id = self.get_remote_id(dirname, True) + parent_id = self._get_remote_id(dirname, True) else: parent_id = to_info.bucket @@ -365,7 +390,7 @@ def _upload(self, from_file, to_info, name, no_progress_bar): ) def _download(self, from_info, to_file, name, no_progress_bar): - file_id = self.get_remote_id(from_info) + file_id = self._get_remote_id(from_info) self.gdrive_download_file(file_id, to_file, name, no_progress_bar) def all(self): From 35f14b929c352c2fbc141d6c865934237d06a589 Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Sun, 9 Feb 2020 16:44:13 -0800 Subject: [PATCH 2/6] fixes after testing GDrive cleanup --- dvc/remote/gdrive.py | 55 +++++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/dvc/remote/gdrive.py b/dvc/remote/gdrive.py index ae578a5e1f..ea43eb7846 100644 --- a/dvc/remote/gdrive.py +++ b/dvc/remote/gdrive.py @@ -74,12 +74,12 @@ def __init__(self, url): # GDrive URL host part is case sensitive, # we are restoring it here. - assert self.netloc == self.host p = urlparse(url) self.host = p.netloc + assert self.netloc == self.host - # Normalize path. Important since we have a cache - # path to ID and don't want to deal with it everywhere in code + # Normalize path. Important since we have a cache (path to ID) + # and don't want to deal with different variations of path in it. self._spath = re.sub("/{2,}", "/", self._spath.rstrip("/")) @@ -97,6 +97,7 @@ def __init__(self, repo, config): super().__init__(repo, config) url = config[Config.SECTION_REMOTE_URL] self.path_info = self.path_cls(url) + self.config = config if not self.path_info.bucket: raise DvcException( @@ -106,19 +107,20 @@ def __init__(self, repo, config): ) ) - self.bucket = self.path_info.bucket - self.config = config - self.client_id = self.config.get(Config.SECTION_GDRIVE_CLIENT_ID, None) - self.client_secret = self.config.get( + self._bucket = self.path_info.bucket + self._client_id = self.config.get( + Config.SECTION_GDRIVE_CLIENT_ID, None + ) + self._client_secret = self.config.get( Config.SECTION_GDRIVE_CLIENT_SECRET, None ) - if not self.client_id or not self.client_secret: + if not self._client_id or not self._client_secret: raise DvcException( "Please specify Google Drive's client id and " "secret in DVC config. Learn more at " "{}.".format(format_link("https://man.dvc.org/remote/add")) ) - self.gdrive_user_credentials_path = ( + self._gdrive_user_credentials_path = ( tmp_fname(os.path.join(self.repo.tmp_dir, "")) if os.getenv(RemoteGDrive.GDRIVE_USER_CREDENTIALS_DATA) else self.config.get( @@ -128,7 +130,8 @@ def __init__(self, repo, config): ), ) ) - self.remote_drive_id = None + self._remote_drive_id = None + self._remote_root_id = None @gdrive_retry def gdrive_upload_file( @@ -174,8 +177,8 @@ def gdrive_download_file( def gdrive_list_item(self, query): param = {"q": query, "maxResults": 1000, "corpora": self.corpora} - if self.remote_drive_id: - param["driveId"] = self.remote_drive_id + if self._remote_drive_id: + param["driveId"] = self._remote_drive_id file_list = self.drive.ListFile(param) @@ -226,7 +229,7 @@ def drive(self): if os.getenv(RemoteGDrive.GDRIVE_USER_CREDENTIALS_DATA): with open( - self.gdrive_user_credentials_path, "w" + self._gdrive_user_credentials_path, "w" ) as credentials_file: credentials_file.write( os.getenv(RemoteGDrive.GDRIVE_USER_CREDENTIALS_DATA) @@ -234,8 +237,8 @@ def drive(self): GoogleAuth.DEFAULT_SETTINGS["client_config_backend"] = "settings" GoogleAuth.DEFAULT_SETTINGS["client_config"] = { - "client_id": self.client_id, - "client_secret": self.client_secret, + "client_id": self._client_id, + "client_secret": self._client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "revoke_uri": "https://oauth2.googleapis.com/revoke", @@ -245,7 +248,7 @@ def drive(self): GoogleAuth.DEFAULT_SETTINGS["save_credentials_backend"] = "file" GoogleAuth.DEFAULT_SETTINGS[ "save_credentials_file" - ] = self.gdrive_user_credentials_path + ] = self._gdrive_user_credentials_path GoogleAuth.DEFAULT_SETTINGS["get_refresh_token"] = True GoogleAuth.DEFAULT_SETTINGS["oauth_scope"] = [ "https://www.googleapis.com/auth/drive", @@ -261,7 +264,7 @@ def drive(self): raise GDriveAccessTokenRefreshError from exc except KeyError as exc: raise GDriveMissedCredentialKeyError( - self.gdrive_user_credentials_path + self._gdrive_user_credentials_path ) from exc # Handle pydrive2.auth.AuthenticationError and other auth failures except Exception as exc: @@ -270,13 +273,13 @@ def drive(self): ) from exc finally: if os.getenv(RemoteGDrive.GDRIVE_USER_CREDENTIALS_DATA): - os.remove(self.gdrive_user_credentials_path) + os.remove(self._gdrive_user_credentials_path) self._gdrive = GoogleDrive(gauth) - if self.bucket != "root" and self.bucket != "appDataFolder": - self.remote_drive_id = self._get_remote_drive_id(self.bucket) - self._corpora = "drive" if self.remote_drive_id else "default" + if self._bucket != "root" and self._bucket != "appDataFolder": + self._remote_drive_id = self._get_remote_drive_id(self._bucket) + self._corpora = "drive" if self._remote_drive_id else "default" self._remote_root_id = self._get_remote_id(self.path_info) self._cached_dirs, self._cached_ids = self.cache_root_dirs() @@ -319,8 +322,8 @@ def get_remote_item(self, name, parents_ids): "corpora": self.corpora, } - if self.remote_drive_id: - param["driveId"] = self.remote_drive_id + if self._remote_drive_id: + param["driveId"] = self._remote_drive_id # Limit found remote items count to 1 in response item_list = self.drive.ListFile(param).GetList() @@ -341,7 +344,7 @@ def _get_remote_ids_from_cache(self, remote_path): def _path_to_remote_ids(self, path, create): if not path: - return [self.bucket] + return [self._bucket] if path == self.path_info.path and self._remote_root_id: return [self._remote_root_id] @@ -366,7 +369,7 @@ def _path_to_remote_ids(self, path, create): return [item["id"]] def _get_remote_id(self, path_info, create=False): - assert path_info.bucket == self.bucket + assert path_info.bucket == self._bucket remote_ids = self._path_to_remote_ids(path_info.path, create) if not remote_ids: @@ -421,5 +424,5 @@ def all(self): logger.debug('Ignoring path as "non-cache looking"') def remove(self, path_info): - remote_id = self.get_remote_id(path_info) + remote_id = self._get_remote_id(path_info) self.delete_remote_file(remote_id) From 5ef0bb949f339f3641b4830c7acb73f51a2f90ad Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Sun, 9 Feb 2020 17:31:04 -0800 Subject: [PATCH 3/6] address CodeClimate complexity, simplify a litte bit --- dvc/remote/gdrive.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/dvc/remote/gdrive.py b/dvc/remote/gdrive.py index ea43eb7846..468a1ae033 100644 --- a/dvc/remote/gdrive.py +++ b/dvc/remote/gdrive.py @@ -337,18 +337,17 @@ def _get_remote_drive_id(self, remote_id): item.FetchMetadata("driveId") return item.get("driveId", None) - def _get_remote_ids_from_cache(self, remote_path): - if hasattr(self, "_cached_dirs"): - return self.cached_dirs.get(remote_path, []) - return [] - - def _path_to_remote_ids(self, path, create): + def _get_known_remote_ids(self, path): if not path: return [self._bucket] if path == self.path_info.path and self._remote_root_id: return [self._remote_root_id] + if hasattr(self, "_cached_dirs"): + return self.cached_dirs.get(path, []) + return [] - remote_ids = self._get_remote_ids_from_cache(path) + def _path_to_remote_ids(self, path, create): + remote_ids = self._get_known_remote_ids(path) if remote_ids: return remote_ids From ca0a5ca9a7f94a5ef293eee17067e7c72018c283 Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Sun, 9 Feb 2020 17:43:36 -0800 Subject: [PATCH 4/6] address CodeClimate complexity, further simplify complexity --- dvc/remote/gdrive.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dvc/remote/gdrive.py b/dvc/remote/gdrive.py index 468a1ae033..57876ab95f 100644 --- a/dvc/remote/gdrive.py +++ b/dvc/remote/gdrive.py @@ -351,11 +351,7 @@ def _path_to_remote_ids(self, path, create): if remote_ids: return remote_ids - if "/" in path: - parent_path, path_part = path.rsplit("/", 1) - else: - parent_path, path_part = ["", path] - + parent_path, path_part = posixpath.split(path) parent_ids = self._path_to_remote_ids(parent_path, create) item = self.get_remote_item(path_part, parent_ids) From 2aff0a0973088314bdc7484d370f03f98be528aa Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Mon, 10 Feb 2020 19:01:19 -0800 Subject: [PATCH 5/6] further refactoring need to fix GDrive tests --- dvc/remote/gdrive.py | 270 ++++++++++++++++++---------------- tests/func/test_data_cloud.py | 39 +++-- tests/remotes.py | 7 +- 3 files changed, 178 insertions(+), 138 deletions(-) diff --git a/dvc/remote/gdrive.py b/dvc/remote/gdrive.py index 57876ab95f..3ad4a78006 100644 --- a/dvc/remote/gdrive.py +++ b/dvc/remote/gdrive.py @@ -130,100 +130,21 @@ def __init__(self, repo, config): ), ) ) - self._remote_drive_id = None - self._remote_root_id = None - - @gdrive_retry - def gdrive_upload_file( - self, args, no_progress_bar=True, from_file="", progress_name="" - ): - parent = {"id": args["parent_id"]} - item = self.drive.CreateFile( - {"title": args["title"], "parents": [parent]} - ) - - with open(from_file, "rb") as fobj: - total = os.path.getsize(from_file) - with Tqdm.wrapattr( - fobj, - "read", - desc=progress_name, - total=total, - disable=no_progress_bar, - ) as wrapped: - # PyDrive doesn't like content property setting for empty files - # https://github.com/gsuitedevs/PyDrive/issues/121 - if total: - item.content = wrapped - item.Upload() - return item - @gdrive_retry - def gdrive_download_file( - self, file_id, to_file, progress_name, no_progress_bar - ): - param = {"id": file_id} - # it does not create a file on the remote - gdrive_file = self.drive.CreateFile(param) - bar_format = ( - "Downloading {desc:{ncols_desc}.{ncols_desc}}... " - + Tqdm.format_sizeof(int(gdrive_file["fileSize"]), "B", 1024) - ) - with Tqdm( - bar_format=bar_format, desc=progress_name, disable=no_progress_bar - ): - gdrive_file.GetContentFile(to_file) + self._list_params = None + self._gdrive = None - def gdrive_list_item(self, query): - param = {"q": query, "maxResults": 1000, "corpora": self.corpora} - - if self._remote_drive_id: - param["driveId"] = self._remote_drive_id - - file_list = self.drive.ListFile(param) - - # Isolate and decorate fetching of remote drive items in pages - get_list = gdrive_retry(lambda: next(file_list, None)) - - # Fetch pages until None is received, lazily flatten the thing - return cat(iter(get_list, None)) - - def cache_root_dirs(self): - cached_dirs = {} - cached_ids = {} - for dir1 in self.gdrive_list_item( - "'{}' in parents and trashed=false".format(self._remote_root_id) - ): - remote_path = posixpath.join(self.path_info.path, dir1["title"]) - cached_dirs.setdefault(remote_path, []).append(dir1["id"]) - cached_ids[dir1["id"]] = dir1["title"] - - return cached_dirs, cached_ids - - @property - def cached_dirs(self): - if not hasattr(self, "_cached_dirs"): - self.drive - return self._cached_dirs - - @property - def cached_ids(self): - if not hasattr(self, "_cached_ids"): - self.drive - return self._cached_ids - - @property - def corpora(self): - if not hasattr(self, "_corpora"): - self.drive - return self._corpora + self._cache_initialized = False + self._remote_root_id = None + self._cached_dirs = None + self._cached_ids = None @property @wrap_with(threading.RLock()) def drive(self): from pydrive2.auth import RefreshError - if not hasattr(self, "_gdrive"): + if not self._gdrive: from pydrive2.auth import GoogleAuth from pydrive2.drive import GoogleDrive @@ -277,17 +198,129 @@ def drive(self): self._gdrive = GoogleDrive(gauth) + return self._gdrive + + @wrap_with(threading.RLock()) + def _initialize_cache(self): + if self._cache_initialized: + return + + cached_dirs = {} + cached_ids = {} + self._remote_root_id = self._get_remote_id(self.path_info) + for dir1 in self.gdrive_list_item( + "'{}' in parents and trashed=false".format(self._remote_root_id) + ): + remote_path = posixpath.join(self.path_info.path, dir1["title"]) + cached_dirs.setdefault(remote_path, []).append(dir1["id"]) + cached_ids[dir1["id"]] = dir1["title"] + + self._cached_dirs = cached_dirs + self._cached_ids = cached_ids + self._cache_initialized = True + + @property + def cached_dirs(self): + if not self._cache_initialized: + self._initialize_cache() + return self._cached_dirs + + @property + def cached_ids(self): + if not self._cache_initialized: + self._initialize_cache() + return self._cached_ids + + @property + def remote_root_id(self): + if not self._cache_initialized: + self._initialize_cache() + return self._remote_root_id + + @property + def list_params(self): + if not self._list_params: + params = {"corpora": "default"} if self._bucket != "root" and self._bucket != "appDataFolder": - self._remote_drive_id = self._get_remote_drive_id(self._bucket) - self._corpora = "drive" if self._remote_drive_id else "default" - self._remote_root_id = self._get_remote_id(self.path_info) + params["driveId"] = self._get_remote_drive_id(self._bucket) + params["corpora"] = "drive" + self._list_params = params + return self._list_params - self._cached_dirs, self._cached_ids = self.cache_root_dirs() + @gdrive_retry + def gdrive_upload_file( + self, + parent_id, + title, + no_progress_bar=True, + from_file="", + progress_name="", + ): + item = self.drive.CreateFile( + {"title": title, "parents": [{"id": parent_id}]} + ) - return self._gdrive + with open(from_file, "rb") as fobj: + total = os.path.getsize(from_file) + with Tqdm.wrapattr( + fobj, + "read", + desc=progress_name, + total=total, + disable=no_progress_bar, + ) as wrapped: + # PyDrive doesn't like content property setting for empty files + # https://github.com/gsuitedevs/PyDrive/issues/121 + if total: + item.content = wrapped + item.Upload() + return item @gdrive_retry - def create_remote_dir(self, parent_id, title): + def gdrive_download_file( + self, file_id, to_file, progress_name, no_progress_bar + ): + param = {"id": file_id} + # it does not create a file on the remote + gdrive_file = self.drive.CreateFile(param) + bar_format = ( + "Downloading {desc:{ncols_desc}.{ncols_desc}}... " + + Tqdm.format_sizeof(int(gdrive_file["fileSize"]), "B", 1024) + ) + with Tqdm( + bar_format=bar_format, desc=progress_name, disable=no_progress_bar + ): + gdrive_file.GetContentFile(to_file) + + def gdrive_list_item(self, query): + param = {"q": query, "maxResults": 1000} + param.update(self.list_params) + + file_list = self.drive.ListFile(param) + + # Isolate and decorate fetching of remote drive items in pages + get_list = gdrive_retry(lambda: next(file_list, None)) + + # Fetch pages until None is received, lazily flatten the thing + return cat(iter(get_list, None)) + + @wrap_with(threading.RLock()) + def gdrive_create_dir(self, parent_id, title, remote_path): + if parent_id == self.remote_root_id: + cached = self.cached_dirs.get(remote_path, []) + if cached: + return cached[0] + + item = self._create_remote_dir(parent_id, title) + + if parent_id == self.remote_root_id: + self.cached_dirs.setdefault(remote_path, []).append(item["id"]) + self.cached_ids[item["id"]] = item["title"] + + return item["id"] + + @gdrive_retry + def _create_remote_dir(self, parent_id, title): parent = {"id": parent_id} item = self.drive.CreateFile( {"title": title, "parents": [parent], "mimeType": FOLDER_MIME_TYPE} @@ -296,14 +329,14 @@ def create_remote_dir(self, parent_id, title): return item @gdrive_retry - def delete_remote_file(self, remote_id): + def _delete_remote_file(self, remote_id): param = {"id": remote_id} # it does not create a file on the remote item = self.drive.CreateFile(param) item.Delete() @gdrive_retry - def get_remote_item(self, name, parents_ids): + def _get_remote_item(self, name, parents_ids): if not parents_ids: return None query = "({})".format( @@ -315,15 +348,10 @@ def get_remote_item(self, name, parents_ids): query += " and trashed=false and title='{}'".format(name) - param = { - "q": query, - # Remote might contain items with duplicated titles - "maxResults": 1, - "corpora": self.corpora, - } - - if self._remote_drive_id: - param["driveId"] = self._remote_drive_id + # Remote might contain items with duplicated path (titles). + # We thus limit number of items. + param = {"q": query, "maxResults": 1} + param.update(self.list_params) # Limit found remote items count to 1 in response item_list = self.drive.ListFile(param).GetList() @@ -337,29 +365,30 @@ def _get_remote_drive_id(self, remote_id): item.FetchMetadata("driveId") return item.get("driveId", None) - def _get_known_remote_ids(self, path): + def _get_cached_remote_ids(self, path): if not path: return [self._bucket] - if path == self.path_info.path and self._remote_root_id: - return [self._remote_root_id] - if hasattr(self, "_cached_dirs"): + if self._cache_initialized: + if path == self.path_info.path: + return [self.remote_root_id] return self.cached_dirs.get(path, []) return [] def _path_to_remote_ids(self, path, create): - remote_ids = self._get_known_remote_ids(path) + remote_ids = self._get_cached_remote_ids(path) if remote_ids: return remote_ids - parent_path, path_part = posixpath.split(path) + parent_path, part = posixpath.split(path) parent_ids = self._path_to_remote_ids(parent_path, create) - item = self.get_remote_item(path_part, parent_ids) + item = self._get_remote_item(part, parent_ids) if not item: - if create: - item = self.create_remote_dir(parent_ids[0], path_part) - else: - return None + return ( + [self.gdrive_create_dir(parent_ids[0], part, path)] + if create + else [] + ) return [item["id"]] @@ -382,16 +411,11 @@ def exists(self, path_info): def _upload(self, from_file, to_info, name, no_progress_bar): dirname = to_info.parent - if dirname: - parent_id = self._get_remote_id(dirname, True) - else: - parent_id = to_info.bucket + assert dirname + parent_id = self._get_remote_id(dirname, True) self.gdrive_upload_file( - {"title": to_info.name, "parent_id": parent_id}, - no_progress_bar, - from_file, - name, + parent_id, to_info.name, no_progress_bar, from_file, name ) def _download(self, from_info, to_file, name, no_progress_bar): @@ -420,4 +444,4 @@ def all(self): def remove(self, path_info): remote_id = self._get_remote_id(path_info) - self.delete_remote_file(remote_id) + self._delete_remote_file(remote_id) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index f4de706312..e5481d52f6 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -194,24 +194,31 @@ def _get_cloud_class(self): return RemoteS3 +def setup_gdrive_cloud(remote_url, dvc): + config = copy.deepcopy(TEST_CONFIG) + config[TEST_SECTION][Config.SECTION_REMOTE_URL] = remote_url + config[TEST_SECTION][ + Config.SECTION_GDRIVE_CLIENT_ID + ] = TEST_GDRIVE_CLIENT_ID + config[TEST_SECTION][ + Config.SECTION_GDRIVE_CLIENT_SECRET + ] = TEST_GDRIVE_CLIENT_SECRET + + dvc.config.config = config + cloud = DataCloud(dvc) + remote = cloud.get_remote() + remote._create_remote_dir("root", remote.path_info.path) + + class TestRemoteGDrive(GDrive, TestDataCloudBase): def _setup_cloud(self): self._ensure_should_run() - repo = self.get_url() + setup_gdrive_cloud(self.get_url(), self.dvc) - config = copy.deepcopy(TEST_CONFIG) - config[TEST_SECTION][Config.SECTION_REMOTE_URL] = repo - config[TEST_SECTION][ - Config.SECTION_GDRIVE_CLIENT_ID - ] = TEST_GDRIVE_CLIENT_ID - config[TEST_SECTION][ - Config.SECTION_GDRIVE_CLIENT_SECRET - ] = TEST_GDRIVE_CLIENT_SECRET - self.dvc.config.config = config self.cloud = DataCloud(self.dvc) - - self.assertIsInstance(self.cloud.get_remote(), self._get_cloud_class()) + remote = self.cloud.get_remote() + self.assertIsInstance(remote, self._get_cloud_class()) def _get_cloud_class(self): return RemoteGDrive @@ -303,7 +310,12 @@ def should_test(): def get_url(): raise NotImplementedError + def _setup_cloud(self): + pass + def _test_cloud(self, remote=None): + self._setup_cloud() + args = ["-v", "-j", "2"] if remote: args += ["-r", remote] @@ -402,6 +414,9 @@ def _test(self): class TestRemoteGDriveCLI(GDrive, TestDataCloudCLIBase): + def _setup_cloud(self): + setup_gdrive_cloud(self.get_url(), self.dvc) + def _test(self): url = self.get_url() diff --git a/tests/remotes.py b/tests/remotes.py index 35876b6768..b446374517 100644 --- a/tests/remotes.py +++ b/tests/remotes.py @@ -150,9 +150,10 @@ class GDrive: def should_test(): return os.getenv(RemoteGDrive.GDRIVE_USER_CREDENTIALS_DATA) is not None - @staticmethod - def get_url(): - return "gdrive://root/" + str(uuid.uuid4()) + def get_url(self): + if not getattr(self, "_remote_url", None): + self._remote_url = "gdrive://root/" + str(uuid.uuid4()) + return self._remote_url class Azure: From 85fef88067ce1fdc9c18f7f013f9c4e2d78e6d67 Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Mon, 10 Feb 2020 19:04:53 -0800 Subject: [PATCH 6/6] tests: minor setup gdrive cloud simplification --- tests/func/test_data_cloud.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index e5481d52f6..2572bb9a49 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -205,8 +205,7 @@ def setup_gdrive_cloud(remote_url, dvc): ] = TEST_GDRIVE_CLIENT_SECRET dvc.config.config = config - cloud = DataCloud(dvc) - remote = cloud.get_remote() + remote = DataCloud(dvc).get_remote() remote._create_remote_dir("root", remote.path_info.path)