From d6b45983837718a55c0bdd2f88b052af4ccccfad Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 15 Mar 2021 15:47:27 +0300 Subject: [PATCH 01/30] s3: migrate to s3fs --- dvc/fs/s3.py | 471 ++++++-------------------------------- tests/remotes/__init__.py | 2 +- tests/remotes/s3.py | 54 ++++- 3 files changed, 117 insertions(+), 410 deletions(-) diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index 17176058a3..b443f532ac 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -1,102 +1,58 @@ -import logging import os import threading -from contextlib import contextmanager +from collections import defaultdict from funcy import cached_property, wrap_prop -from dvc.config import ConfigError -from dvc.exceptions import DvcException, ETagMismatchError -from dvc.hash_info import HashInfo from dvc.path_info import CloudURLInfo -from dvc.progress import Tqdm from dvc.scheme import Schemes -from dvc.utils import conversions, error_link -from .base import BaseFileSystem - -logger = logging.getLogger(__name__) +from .fsspec_wrapper import FSSpecWrapper _AWS_CONFIG_PATH = os.path.join(os.path.expanduser("~"), ".aws", "config") -class S3FileSystem(BaseFileSystem): +class S3FileSystem(FSSpecWrapper): scheme = Schemes.S3 PATH_CLS = CloudURLInfo - REQUIRES = {"boto3": "boto3"} + REQUIRES = {"s3fs": "s3fs"} PARAM_CHECKSUM = "etag" DETAIL_FIELDS = frozenset(("etag", "size")) + _GRANTS = { + "grant_full_control": "GrantFullControl", + "grant_read": "GrantRead", + "grant_read_acp": "GrantReadACP", + "grant_write_acp": "GrantWriteACP", + } + def __init__(self, repo, config): super().__init__(repo, config) url = config.get("url", "s3://") self.path_info = self.PATH_CLS(url) - self.region = config.get("region") - self.profile = config.get("profile") - self.endpoint_url = config.get("endpointurl") - - self.use_ssl = config.get("use_ssl", True) - - self.extra_args = {} - - self.sse = config.get("sse") - if self.sse: - self.extra_args["ServerSideEncryption"] = self.sse - - self.sse_kms_key_id = config.get("sse_kms_key_id") - if self.sse_kms_key_id: - self.extra_args["SSEKMSKeyId"] = self.sse_kms_key_id - - self.acl = config.get("acl") - if self.acl: - self.extra_args["ACL"] = self.acl - - self._append_aws_grants_to_extra_args(config) - - self.access_key_id = config.get("access_key_id") - self.secret_access_key = config.get("secret_access_key") - self.session_token = config.get("session_token") - - shared_creds = config.get("credentialpath") - if shared_creds: - os.environ.setdefault("AWS_SHARED_CREDENTIALS_FILE", shared_creds) - - config_path = config.get("configpath") - if config_path: - os.environ.setdefault("AWS_CONFIG_FILE", config_path) - self._transfer_config = None - - # https://github.com/aws/aws-cli/blob/0376c6262d6b15dc36c82e6da6e1aad10249cc8c/awscli/customizations/s3/transferconfig.py#L107-L113 - _TRANSFER_CONFIG_ALIASES = { - "max_queue_size": "max_io_queue", - "max_concurrent_requests": "max_concurrency", - "multipart_threshold": "multipart_threshold", - "multipart_chunksize": "multipart_chunksize", - } + self.transfer_config = {} + self.login_info = self._prepare_credentials(config) - def _transform_config(self, s3_config): + def _split_s3_config(self, s3_config): """Splits the general s3 config into 2 different config objects, one for transfer.TransferConfig and other is the general session config""" + from dvc.utils import conversions - config, transfer_config = {}, {} + config = {} for key, value in s3_config.items(): - if key in self._TRANSFER_CONFIG_ALIASES: - if key in {"multipart_chunksize", "multipart_threshold"}: - # cast human readable sizes (like 24MiB) to integers - value = conversions.human_readable_to_bytes(value) - else: - value = int(value) - transfer_config[self._TRANSFER_CONFIG_ALIASES[key]] = value + if key in {"multipart_chunksize", "multipart_threshold"}: + self.transfer_config[ + key + ] = conversions.human_readable_to_bytes(value) else: config[key] = value - return config, transfer_config + return config - def _process_config(self): - from boto3.s3.transfer import TransferConfig + def _load_aws_config_file(self, profile): from botocore.configloader import load_config config_path = os.environ.get("AWS_CONFIG_FILE", _AWS_CONFIG_PATH) @@ -104,359 +60,70 @@ def _process_config(self): return None config = load_config(config_path) - profile = config["profiles"].get(self.profile or "default") - if not profile: + profile_config = config["profiles"].get(profile or "default") + if not profile_config: return None - s3_config = profile.get("s3", {}) - s3_config, transfer_config = self._transform_config(s3_config) - self._transfer_config = TransferConfig(**transfer_config) - return s3_config + s3_config = profile_config.get("s3", {}) + return self._split_s3_config(s3_config) - @wrap_prop(threading.Lock()) - @cached_property - def s3(self): - import boto3 - - session_opts = { - "profile_name": self.profile, - "region_name": self.region, - } - - if self.access_key_id: - session_opts["aws_access_key_id"] = self.access_key_id - if self.secret_access_key: - session_opts["aws_secret_access_key"] = self.secret_access_key - if self.session_token: - session_opts["aws_session_token"] = self.session_token - - session = boto3.session.Session(**session_opts) - s3_config = self._process_config() - - return session.resource( - "s3", - endpoint_url=self.endpoint_url, - use_ssl=self.use_ssl, - config=boto3.session.Config( - signature_version="s3v4", s3=s3_config - ), - ) + def _prepare_credentials(self, config): + from dvc.config import ConfigError + from dvc.utils.flatten import flatten, unflatten - @contextmanager - def _get_s3(self): - from botocore.exceptions import ( - EndpointConnectionError, - NoCredentialsError, - ) - - try: - yield self.s3 - except NoCredentialsError as exc: - link = error_link("no-credentials") - raise DvcException( - f"Unable to find AWS credentials. {link}" - ) from exc - except EndpointConnectionError as exc: - link = error_link("connection-error") - name = self.endpoint_url or "AWS S3" - raise DvcException( - f"Unable to connect to '{name}'. {link}" - ) from exc + login_info = defaultdict(dict) - @contextmanager - def _get_bucket(self, bucket): - with self._get_s3() as s3: - try: - yield s3.Bucket(bucket) - except s3.meta.client.exceptions.NoSuchBucket as exc: - link = error_link("no-bucket") - raise DvcException( - f"Bucket '{bucket}' does not exist. {link}" - ) from exc + # credentials + login_info["key"] = config.get("access_key_id") + login_info["secret"] = config.get("secret_access_key") + login_info["token"] = config.get("session_token") - @contextmanager - def _get_obj(self, path_info): - with self._get_bucket(path_info.bucket) as bucket: - try: - yield bucket.Object(path_info.path) - except bucket.meta.client.exceptions.NoSuchKey as exc: - raise DvcException(f"{path_info.url} does not exist") from exc + # session configuration + login_info["profile"] = config.get("profile") + login_info["use_ssl"] = config.get("use_ssl", True) - def _append_aws_grants_to_extra_args(self, config): - # Keys for extra_args can be one of the following list: - # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS - """ - ALLOWED_UPLOAD_ARGS = [ - 'ACL', 'CacheControl', 'ContentDisposition', 'ContentEncoding', - 'ContentLanguage', 'ContentType', 'Expires', 'GrantFullControl', - 'GrantRead', 'GrantReadACP', 'GrantWriteACP', 'Metadata', - 'RequestPayer', 'ServerSideEncryption', 'StorageClass', - 'SSECustomerAlgorithm', 'SSECustomerKey', 'SSECustomerKeyMD5', - 'SSEKMSKeyId', 'WebsiteRedirectLocation' - ] - """ + # extra client configuration + client = login_info["client_kwargs"] + client["region_name"] = config.get("region") + client["endpoint_url"] = config.get("endpointurl") - grants = { - "grant_full_control": "GrantFullControl", - "grant_read": "GrantRead", - "grant_read_acp": "GrantReadACP", - "grant_write_acp": "GrantWriteACP", - } - - for grant_option, extra_args_key in grants.items(): + # encryptions + additional = login_info["s3_additional_kwargs"] + additional["ServerSideEncryption"] = config.get("sse") + additional["sse_kms_key_id"] = config.get("sse_kms_key_id") + additional["ACL"] = config.get("acl") + for grant_option, grant_key in self._GRANTS.items(): if config.get(grant_option): - if self.acl: + if additional["ACL"]: raise ConfigError( "`acl` and `grant_*` AWS S3 config options " "are mutually exclusive" ) + additional[grant_key] = config[grant_option] - self.extra_args[extra_args_key] = config.get(grant_option) - - def _generate_download_url(self, path_info, expires=3600): - params = {"Bucket": path_info.bucket, "Key": path_info.path} - with self._get_s3() as s3: - return s3.meta.client.generate_presigned_url( - ClientMethod="get_object", - Params=params, - ExpiresIn=int(expires), - ) - - def exists(self, path_info, use_dvcignore=True): - """Check if the blob exists. If it does not exist, - it could be a part of a directory path. - - eg: if `data/file.txt` exists, check for `data` should return True - """ - return self.isfile(path_info) or self.isdir(path_info) - - def isdir(self, path_info): - # S3 doesn't have a concept for directories. - # - # Using `head_object` with a path pointing to a directory - # will throw a 404 error. - # - # A reliable way to know if a given path is a directory is by - # checking if there are more files sharing the same prefix - # with a `list_objects` call. - # - # We need to make sure that the path ends with a forward slash, - # since we can end with false-positives like the following example: - # - # bucket - # └── data - # ├── alice - # └── alpha - # - # Using `data/al` as prefix will return `[data/alice, data/alpha]`, - # While `data/al/` will return nothing. - # - dir_path = path_info / "" - return bool(list(self._list_paths(dir_path, max_items=1))) - - def isfile(self, path_info): - if path_info.path.endswith("/"): - return False - - return path_info.path in self._list_paths(path_info) - - def info(self, path_info): - # temporary workaround, will be removed when migrating to s3fs - if self.isdir(path_info): - return {"type": "directory"} - - with self._get_obj(path_info) as obj: - return { - "type": "file", - "etag": obj.e_tag.strip('"'), - "size": obj.content_length, - } - - def _list_paths(self, path_info, max_items=None): - with self._get_bucket(path_info.bucket) as bucket: - obj_summaries = bucket.objects.filter(Prefix=path_info.path) - if max_items is not None: - obj_summaries = obj_summaries.page_size(max_items).limit( - max_items - ) - for obj_summary in obj_summaries: - yield obj_summary.key - - def walk_files(self, path_info, **kwargs): - if not kwargs.pop("prefix", False): - path_info = path_info / "" - for fname in self._list_paths(path_info, **kwargs): - if fname.endswith("/"): - continue - - yield path_info.replace(path=fname) - - def ls( - self, path_info, detail=False, recursive=False - ): # pylint: disable=arguments-differ - assert recursive - - with self._get_bucket(path_info.bucket) as bucket: - for obj_summary in bucket.objects.filter(Prefix=path_info.path): - if detail: - yield { - "type": "file", - "name": obj_summary.key, - "size": obj_summary.size, - "etag": obj_summary.e_tag.strip('"'), - } - else: - yield obj_summary.key - - def remove(self, path_info): - if path_info.scheme != "s3": - raise NotImplementedError - - logger.debug(f"Removing {path_info}") - with self._get_obj(path_info) as obj: - obj.delete() - - def makedirs(self, path_info): - # We need to support creating empty directories, which means - # creating an object with an empty body and a trailing slash `/`. - # - # We are not creating directory objects for every parent prefix, - # as it is not required. - if not path_info.path: - return - - dir_path = path_info / "" - with self._get_obj(dir_path) as obj: - obj.put(Body="") - - def copy(self, from_info, to_info): - with self._get_s3() as s3: - self._copy(s3.meta.client, from_info, to_info, self.extra_args) - - @classmethod - def _copy_multipart( - cls, s3, from_info, to_info, size, n_parts, extra_args - ): - mpu = s3.create_multipart_upload( - Bucket=to_info.bucket, Key=to_info.path, **extra_args + # config kwargs + session_config = login_info["config_kwargs"] + session_config["s3"] = self._load_aws_config_file( + login_info["profile"] ) - mpu_id = mpu["UploadId"] - - parts = [] - byte_position = 0 - for i in range(1, n_parts + 1): - obj = s3.head_object( - Bucket=from_info.bucket, Key=from_info.path, PartNumber=i - ) - part_size = obj["ContentLength"] - lastbyte = byte_position + part_size - 1 - if lastbyte > size: - lastbyte = size - 1 - - srange = f"bytes={byte_position}-{lastbyte}" - part = s3.upload_part_copy( - Bucket=to_info.bucket, - Key=to_info.path, - PartNumber=i, - UploadId=mpu_id, - CopySourceRange=srange, - CopySource={"Bucket": from_info.bucket, "Key": from_info.path}, - ) - parts.append( - {"PartNumber": i, "ETag": part["CopyPartResult"]["ETag"]} - ) - byte_position += part_size - - assert n_parts == len(parts) - - s3.complete_multipart_upload( - Bucket=to_info.bucket, - Key=to_info.path, - UploadId=mpu_id, - MultipartUpload={"Parts": parts}, + return unflatten( + { + key: value + for key, value in flatten(login_info).items() + if value is not None + } ) - @classmethod - def _copy(cls, s3, from_info, to_info, extra_args): - # NOTE: object's etag depends on the way it was uploaded to s3 or the - # way it was copied within the s3. More specifically, it depends on - # the chunk size that was used to transfer it, which would affect - # whether an object would be uploaded as a single part or as a - # multipart. - # - # If an object's etag looks like '8978c98bb5a48c2fb5f2c4c905768afa', - # then it was transferred as a single part, which means that the chunk - # size used to transfer it was greater or equal to the ContentLength - # of that object. So to preserve that tag over the next transfer, we - # could use any value >= ContentLength. - # - # If an object's etag looks like '50d67013a5e1a4070bef1fc8eea4d5f9-13', - # then it was transferred as a multipart, which means that the chunk - # size used to transfer it was less than ContentLength of that object. - # Unfortunately, in general, it doesn't mean that the chunk size was - # the same throughout the transfer, so it means that in order to - # preserve etag, we need to transfer each part separately, so the - # object is transfered in the same chunks as it was originally. - from boto3.s3.transfer import TransferConfig + def _entry_hook(self, entry): + entry = entry.copy() + if "ETag" in entry: + entry["etag"] = entry["ETag"] + return entry - obj = s3.head_object(Bucket=from_info.bucket, Key=from_info.path) - etag = obj["ETag"].strip('"') - size = obj["ContentLength"] - - _, _, parts_suffix = etag.partition("-") - if parts_suffix: - n_parts = int(parts_suffix) - cls._copy_multipart( - s3, from_info, to_info, size, n_parts, extra_args=extra_args - ) - else: - source = {"Bucket": from_info.bucket, "Key": from_info.path} - s3.copy( - source, - to_info.bucket, - to_info.path, - ExtraArgs=extra_args, - Config=TransferConfig(multipart_threshold=size + 1), - ) - - cached_etag = s3.head_object(Bucket=to_info.bucket, Key=to_info.path)[ - "ETag" - ].strip('"') - if etag != cached_etag: - raise ETagMismatchError(etag, cached_etag) - - def etag(self, path_info): - with self._get_obj(path_info) as obj: - return HashInfo("etag", size=obj.content_length,) - - def _upload_fobj(self, fobj, to_info): - with self._get_obj(to_info) as obj: - obj.upload_fileobj(fobj, Config=self._transfer_config) - - def _upload( - self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs - ): - with self._get_obj(to_info) as obj: - total = os.path.getsize(from_file) - with Tqdm( - disable=no_progress_bar, total=total, bytes=True, desc=name - ) as pbar: - obj.upload_file( - from_file, - Callback=pbar.update, - ExtraArgs=self.extra_args, - Config=self._transfer_config, - ) + @wrap_prop(threading.Lock()) + @cached_property + def fs(self): + from s3fs import S3FileSystem as _S3FileSystem - def _download(self, from_info, to_file, name=None, no_progress_bar=False): - with self._get_obj(from_info) as obj: - with Tqdm( - disable=no_progress_bar, - total=obj.content_length, - bytes=True, - desc=name, - ) as pbar: - obj.download_file( - to_file, Callback=pbar.update, Config=self._transfer_config - ) + return _S3FileSystem(**self.login_info) diff --git a/tests/remotes/__init__.py b/tests/remotes/__init__.py index 72fa6e1a61..d91692d63c 100644 --- a/tests/remotes/__init__.py +++ b/tests/remotes/__init__.py @@ -24,7 +24,7 @@ oss_server, real_oss, ) -from .s3 import S3, TEST_AWS_REPO_BUCKET, real_s3, s3 # noqa: F401 +from .s3 import S3, TEST_AWS_REPO_BUCKET, real_s3, s3, s3_server # noqa: F401 from .ssh import ( # noqa: F401; noqa: F401 SSHMocked, ssh, diff --git a/tests/remotes/s3.py b/tests/remotes/s3.py index b0517c221e..b38a2ee79d 100644 --- a/tests/remotes/s3.py +++ b/tests/remotes/s3.py @@ -1,10 +1,10 @@ import locale import os +import time import uuid import pytest from funcy import cached_property -from moto import mock_s3 from dvc.path_info import CloudURLInfo from dvc.utils import env2bool @@ -13,11 +13,18 @@ TEST_AWS_REPO_BUCKET = os.environ.get("DVC_TEST_AWS_REPO_BUCKET", "dvc-temp") +TEST_AWS_S3_PORT = 5555 +TEST_AWS_ENDPOINT_URL = f"http://127.0.0.1:{TEST_AWS_S3_PORT}/" + class S3(Base, CloudURLInfo): IS_OBJECT_STORAGE = True + @cached_property + def config(self): + return {"url": self.url, "endpointurl": TEST_AWS_ENDPOINT_URL} + @staticmethod def should_test(): do_test = env2bool("DVC_TEST_AWS", undefined=None) @@ -49,7 +56,7 @@ def get_url(): def _s3(self): import boto3 - return boto3.client("s3") + return boto3.client("s3", endpoint_url=TEST_AWS_ENDPOINT_URL) def is_file(self): from botocore.exceptions import ClientError @@ -94,15 +101,48 @@ def read_text(self, encoding=None, errors=None): return self.read_bytes().decode(encoding) +# Due to moto being uncompatible with aioboto on wrapper +# mode, we start the moto server as a subprocess (imitates +# a real S3 service) and then create a client that uses +# this server. +# +# Originally adopted from: +# https://github.com/dask/s3fs/blob/main/s3fs/tests/test_s3fs.py#L66-L86 @pytest.fixture -def s3(test_config): +def s3_server(test_config): test_config.requires("s3") - with mock_s3(): - import boto3 - boto3.client("s3").create_bucket(Bucket=TEST_AWS_REPO_BUCKET) + import shlex + import subprocess + + import requests + + proc = subprocess.Popen( + shlex.split(f"moto_server s3 -p {TEST_AWS_S3_PORT}") + ) - yield S3(S3.get_url()) + timeout = 5 + while timeout > 0: + try: + r = requests.get(TEST_AWS_ENDPOINT_URL) + if r.ok: + break + except requests.RequestException: + pass + timeout -= 0.1 + time.sleep(0.1) + + yield + + proc.terminate() + proc.wait() + + +@pytest.fixture +def s3(s3_server): + workspace = S3(S3.get_url()) + workspace._s3.create_bucket(Bucket=TEST_AWS_REPO_BUCKET) + yield workspace @pytest.fixture From 396d90c80127a7540cbcbe0b4a41855db61789c9 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 15 Mar 2021 23:24:26 +0300 Subject: [PATCH 02/30] s3: strip etag --- dvc/fs/s3.py | 2 +- tests/func/test_import_url.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index b443f532ac..d4edd238ab 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -118,7 +118,7 @@ def _prepare_credentials(self, config): def _entry_hook(self, entry): entry = entry.copy() if "ETag" in entry: - entry["etag"] = entry["ETag"] + entry["etag"] = entry["ETag"].strip('"') return entry @wrap_prop(threading.Lock()) diff --git a/tests/func/test_import_url.py b/tests/func/test_import_url.py index 28d03ab231..8d191511e7 100644 --- a/tests/func/test_import_url.py +++ b/tests/func/test_import_url.py @@ -363,8 +363,8 @@ def test_import_url_to_remote_invalid_combinations(dvc): @pytest.mark.parametrize( "workspace", [ - pytest.lazy_fixture("s3"), pytest.lazy_fixture("hdfs"), + pytest.param(pytest.lazy_fixture("s3"), marks=empty_xfail), pytest.param(pytest.lazy_fixture("gs"), marks=empty_xfail), pytest.param(pytest.lazy_fixture("azure"), marks=empty_xfail), pytest.param( From a0c81ab519ea9e10f5cfcd3eaa17f64fad59ad3c Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 15 Mar 2021 23:55:23 +0300 Subject: [PATCH 03/30] s3: disable instance cache --- dvc/fs/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index d4edd238ab..35572345c7 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -126,4 +126,4 @@ def _entry_hook(self, entry): def fs(self): from s3fs import S3FileSystem as _S3FileSystem - return _S3FileSystem(**self.login_info) + return _S3FileSystem(**self.login_info, skip_instance_cache=True) From 214319e808484478eacb73a625794799fe009360 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 16 Mar 2021 10:26:35 +0300 Subject: [PATCH 04/30] tests: use the proper remote:// form in test_checkout --- tests/func/test_checkout.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index bfd1112293..e381633c9b 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -856,7 +856,7 @@ def test_checkout_external_modified_file(tmp_dir, dvc, scm, mocker, workspace): # was attempted without force, dvc checks if it's present in its cache # before asking user to remove it. workspace.gen("foo", "foo") - dvc.add(str(workspace / "foo"), external=True) + dvc.add("remote://workspace/foo", external=True) scm.add(["foo.dvc"]) scm.commit("add foo") From 2ffc53f5df28f50e8c9709e005acc1e48a005790 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 17 Mar 2021 17:29:02 +0300 Subject: [PATCH 05/30] tests: move/drop some tests --- dvc/fs/s3.py | 15 +- tests/func/test_s3.py | 219 -------------------------- tests/unit/fs/test_s3.py | 120 ++++++++------ tests/unit/remote/test_remote_tree.py | 31 +--- 4 files changed, 91 insertions(+), 294 deletions(-) delete mode 100644 tests/func/test_s3.py diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index 35572345c7..7e9473e60e 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -32,7 +32,7 @@ def __init__(self, repo, config): url = config.get("url", "s3://") self.path_info = self.PATH_CLS(url) - self.transfer_config = {} + self._open_args = {} self.login_info = self._prepare_credentials(config) def _split_s3_config(self, s3_config): @@ -44,8 +44,8 @@ def _split_s3_config(self, s3_config): config = {} for key, value in s3_config.items(): if key in {"multipart_chunksize", "multipart_threshold"}: - self.transfer_config[ - key + self._open_args[ + "block_size" ] = conversions.human_readable_to_bytes(value) else: config[key] = value @@ -90,7 +90,7 @@ def _prepare_credentials(self, config): # encryptions additional = login_info["s3_additional_kwargs"] additional["ServerSideEncryption"] = config.get("sse") - additional["sse_kms_key_id"] = config.get("sse_kms_key_id") + additional["SSEKMSKeyId"] = config.get("sse_kms_key_id") additional["ACL"] = config.get("acl") for grant_option, grant_key in self._GRANTS.items(): if config.get(grant_option): @@ -127,3 +127,10 @@ def fs(self): from s3fs import S3FileSystem as _S3FileSystem return _S3FileSystem(**self.login_info, skip_instance_cache=True) + + def open( + self, path_info, mode="r", **kwargs + ): # pylint: disable=arguments-differ + return self.fs.open( + self._with_bucket(path_info), mode=mode, **self._open_args + ) diff --git a/tests/func/test_s3.py b/tests/func/test_s3.py deleted file mode 100644 index c16b3a4e88..0000000000 --- a/tests/func/test_s3.py +++ /dev/null @@ -1,219 +0,0 @@ -import importlib -import sys -import textwrap -from functools import wraps - -import boto3 -import moto.s3.models as s3model -import pytest -from moto import mock_s3 - -from dvc.fs.s3 import S3FileSystem -from dvc.objects.db.base import ObjectDB -from tests.remotes import S3 - -# from https://github.com/spulec/moto/blob/v1.3.5/tests/test_s3/test_s3.py#L40 -REDUCED_PART_SIZE = 256 - - -def reduced_min_part_size(f): - """ speed up tests by temporarily making the multipart minimum part size - small - """ - orig_size = s3model.UPLOAD_PART_MIN_SIZE - - @wraps(f) - def wrapped(*args, **kwargs): - try: - s3model.UPLOAD_PART_MIN_SIZE = REDUCED_PART_SIZE - return f(*args, **kwargs) - finally: - s3model.UPLOAD_PART_MIN_SIZE = orig_size - - return wrapped - - -def _get_src_dst(): - base_info = S3FileSystem.PATH_CLS(S3.get_url()) - return base_info / "from", base_info / "to" - - -@mock_s3 -def test_copy_singlepart_preserve_etag(): - from_info, to_info = _get_src_dst() - - s3 = boto3.client("s3") - s3.create_bucket(Bucket=from_info.bucket) - s3.put_object(Bucket=from_info.bucket, Key=from_info.path, Body="data") - - S3FileSystem._copy(s3, from_info, to_info, {}) - - -@mock_s3 -@pytest.mark.parametrize( - "base_info", - [ - S3FileSystem.PATH_CLS("s3://bucket/"), - S3FileSystem.PATH_CLS("s3://bucket/ns/"), - ], -) -def test_link_created_on_non_nested_path(base_info, tmp_dir, dvc, scm): - from dvc.checkout import _link - - fs = S3FileSystem(dvc, {"url": str(base_info.parent)}) - odb = ObjectDB(fs) - s3 = odb.fs.s3.meta.client - s3.create_bucket(Bucket=base_info.bucket) - s3.put_object( - Bucket=base_info.bucket, Key=(base_info / "from").path, Body="data" - ) - _link(odb, base_info / "from", base_info / "to") - - assert odb.fs.exists(base_info / "from") - assert odb.fs.exists(base_info / "to") - - -@mock_s3 -def test_makedirs_doesnot_try_on_top_level_paths(tmp_dir, dvc, scm): - base_info = S3FileSystem.PATH_CLS("s3://bucket/") - fs = S3FileSystem(dvc, {"url": str(base_info)}) - fs.makedirs(base_info) - - -def _upload_multipart(s3, Bucket, Key): - mpu = s3.create_multipart_upload(Bucket=Bucket, Key=Key) - mpu_id = mpu["UploadId"] - - parts = [] - n_parts = 10 - for i in range(1, n_parts + 1): - # NOTE: Generation parts of variable size. Part size should be at - # least 5MB: - # https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html - part_size = REDUCED_PART_SIZE + i - body = b"1" * part_size - part = s3.upload_part( - Bucket=Bucket, - Key=Key, - PartNumber=i, - UploadId=mpu_id, - Body=body, - ContentLength=len(body), - ) - parts.append({"PartNumber": i, "ETag": part["ETag"]}) - - s3.complete_multipart_upload( - Bucket=Bucket, - Key=Key, - UploadId=mpu_id, - MultipartUpload={"Parts": parts}, - ) - - -@mock_s3 -@reduced_min_part_size -def test_copy_multipart_preserve_etag(): - from_info, to_info = _get_src_dst() - - s3 = boto3.client("s3") - s3.create_bucket(Bucket=from_info.bucket) - _upload_multipart(s3, from_info.bucket, from_info.path) - S3FileSystem._copy(s3, from_info, to_info, {}) - - -def test_s3_isdir(tmp_dir, dvc, s3): - s3.gen({"data": {"foo": "foo"}}) - fs = S3FileSystem(dvc, s3.config) - - assert not fs.isdir(s3 / "data" / "foo") - assert fs.isdir(s3 / "data") - - -def test_s3_upload_fobj(tmp_dir, dvc, s3): - s3.gen({"data": {"foo": "foo"}}) - fs = S3FileSystem(dvc, s3.config) - - to_info = s3 / "data" / "bar" - with fs.open(s3 / "data" / "foo", "rb") as stream: - fs.upload_fobj(stream, to_info, 1) - - assert to_info.read_text() == "foo" - - -KB = 1024 -MB = KB ** 2 -GB = KB ** 3 - - -def test_s3_aws_config(tmp_dir, dvc, s3, monkeypatch): - config_directory = tmp_dir / ".aws" - config_directory.mkdir() - (config_directory / "config").write_text( - textwrap.dedent( - """\ - [default] - s3 = - max_concurrent_requests = 20000 - max_queue_size = 1000 - multipart_threshold = 1000KiB - multipart_chunksize = 64MB - use_accelerate_endpoint = true - addressing_style = path - """ - ) - ) - - if sys.platform == "win32": - var = "USERPROFILE" - else: - var = "HOME" - monkeypatch.setenv(var, str(tmp_dir)) - - # Fresh import to see the effects of changing HOME variable - s3_mod = importlib.reload(sys.modules[S3FileSystem.__module__]) - fs = s3_mod.S3FileSystem(dvc, s3.config) - assert fs._transfer_config is None - - with fs._get_s3() as s3: - s3_config = s3.meta.client.meta.config.s3 - assert s3_config["use_accelerate_endpoint"] - assert s3_config["addressing_style"] == "path" - - transfer_config = fs._transfer_config - assert transfer_config.max_io_queue_size == 1000 - assert transfer_config.multipart_chunksize == 64 * MB - assert transfer_config.multipart_threshold == 1000 * KB - assert transfer_config.max_request_concurrency == 20000 - - -def test_s3_aws_config_different_profile(tmp_dir, dvc, s3, monkeypatch): - config_file = tmp_dir / "aws_config.ini" - config_file.write_text( - textwrap.dedent( - """\ - [default] - extra = keys - s3 = - addressing_style = auto - use_accelerate_endpoint = true - multipart_threshold = ThisIsNotGoingToBeCasted! - [profile dev] - some_extra = keys - s3 = - addresing_style = virtual - multipart_threshold = 2GiB - """ - ) - ) - monkeypatch.setenv("AWS_CONFIG_FILE", config_file) - - fs = S3FileSystem(dvc, {**s3.config, "profile": "dev"}) - assert fs._transfer_config is None - - with fs._get_s3() as s3: - s3_config = s3.meta.client.meta.config.s3 - assert s3_config["addresing_style"] == "virtual" - assert "use_accelerate_endpoint" not in s3_config - - transfer_config = fs._transfer_config - assert transfer_config.multipart_threshold == 2 * GB diff --git a/tests/unit/fs/test_s3.py b/tests/unit/fs/test_s3.py index 5685342333..34f7f6bcbf 100644 --- a/tests/unit/fs/test_s3.py +++ b/tests/unit/fs/test_s3.py @@ -1,7 +1,10 @@ +import importlib +import sys +import textwrap + import pytest from dvc.config import ConfigError -from dvc.exceptions import DvcException from dvc.fs.s3 import S3FileSystem bucket_name = "bucket-name" @@ -39,13 +42,14 @@ def test_grants(dvc): } fs = S3FileSystem(dvc, config) + extra_args = fs.login_info["s3_additional_kwargs"] assert ( - fs.extra_args["GrantRead"] + extra_args["GrantRead"] == "id=read-permission-id,id=other-read-permission-id" ) - assert fs.extra_args["GrantReadACP"] == "id=read-acp-permission-id" - assert fs.extra_args["GrantWriteACP"] == "id=write-acp-permission-id" - assert fs.extra_args["GrantFullControl"] == "id=full-control-permission-id" + assert extra_args["GrantReadACP"] == "id=read-acp-permission-id" + assert extra_args["GrantWriteACP"] == "id=write-acp-permission-id" + assert extra_args["GrantFullControl"] == "id=full-control-permission-id" def test_grants_mutually_exclusive_acl_error(dvc, grants): @@ -58,7 +62,7 @@ def test_grants_mutually_exclusive_acl_error(dvc, grants): def test_sse_kms_key_id(dvc): fs = S3FileSystem(dvc, {"url": url, "sse_kms_key_id": "key"}) - assert fs.extra_args["SSEKMSKeyId"] == "key" + assert fs.login_info["s3_additional_kwargs"]["SSEKMSKeyId"] == "key" def test_key_id_and_secret(dvc): @@ -71,44 +75,70 @@ def test_key_id_and_secret(dvc): "session_token": session_token, }, ) - assert fs.access_key_id == key_id - assert fs.secret_access_key == key_secret - assert fs.session_token == session_token - - -def test_get_s3_no_credentials(mocker): - from botocore.exceptions import NoCredentialsError - - fs = S3FileSystem(None, {}) - with pytest.raises(DvcException, match="Unable to find AWS credentials"): - with fs._get_s3(): - raise NoCredentialsError - - -def test_get_s3_connection_error(mocker): - from botocore.exceptions import EndpointConnectionError - - fs = S3FileSystem(None, {}) - - msg = "Unable to connect to 'AWS S3'." - with pytest.raises(DvcException, match=msg): - with fs._get_s3(): - raise EndpointConnectionError(endpoint_url="url") - - -def test_get_s3_connection_error_endpoint(mocker): - from botocore.exceptions import EndpointConnectionError - - fs = S3FileSystem(None, {"endpointurl": "https://example.com"}) - - msg = "Unable to connect to 'https://example.com'." - with pytest.raises(DvcException, match=msg): - with fs._get_s3(): - raise EndpointConnectionError(endpoint_url="url") + assert fs.login_info["key"] == key_id + assert fs.login_info["secret"] == key_secret + assert fs.login_info["token"] == session_token + + +KB = 1024 +MB = KB ** 2 +GB = KB ** 3 + + +def test_s3_aws_config(tmp_dir, dvc, s3, monkeypatch): + config_directory = tmp_dir / ".aws" + config_directory.mkdir() + (config_directory / "config").write_text( + textwrap.dedent( + """\ + [default] + s3 = + multipart_chunksize = 64MB + use_accelerate_endpoint = true + addressing_style = path + """ + ) + ) + if sys.platform == "win32": + var = "USERPROFILE" + else: + var = "HOME" + monkeypatch.setenv(var, str(tmp_dir)) + + # Fresh import to see the effects of changing HOME variable + s3_mod = importlib.reload(sys.modules[S3FileSystem.__module__]) + fs = s3_mod.S3FileSystem(dvc, s3.config) + + s3_config = fs.login_info["config_kwargs"]["s3"] + assert s3_config["use_accelerate_endpoint"] + assert s3_config["addressing_style"] == "path" + assert fs._open_args["block_size"] == 64 * MB + + +def test_s3_aws_config_different_profile(tmp_dir, dvc, s3, monkeypatch): + config_file = tmp_dir / "aws_config.ini" + config_file.write_text( + textwrap.dedent( + """\ + [default] + extra = keys + s3 = + addressing_style = auto + use_accelerate_endpoint = true + multipart_threshold = ThisIsNotGoingToBeCasted! + [profile dev] + some_extra = keys + s3 = + addressing_style = virtual + multipart_threshold = 2GiB + """ + ) + ) + monkeypatch.setenv("AWS_CONFIG_FILE", config_file) -def test_get_bucket(): - fs = S3FileSystem(None, {"url": "s3://mybucket/path"}) - with pytest.raises(DvcException, match="Bucket 'mybucket' does not exist"): - with fs._get_bucket("mybucket") as bucket: - raise bucket.meta.client.exceptions.NoSuchBucket({}, None) + fs = S3FileSystem(dvc, {**s3.config, "profile": "dev"}) + s3_config = fs.login_info["config_kwargs"]["s3"] + assert "use_accelerate_endpoint" not in s3_config + assert s3_config["addressing_style"] == "virtual" + assert fs._open_args["block_size"] == 2 * GB diff --git a/tests/unit/remote/test_remote_tree.py b/tests/unit/remote/test_remote_tree.py index d012ad2781..8ad001637d 100644 --- a/tests/unit/remote/test_remote_tree.py +++ b/tests/unit/remote/test_remote_tree.py @@ -86,10 +86,12 @@ def test_walk_files(remote): @pytest.mark.parametrize("remote", [pytest.lazy_fixture("s3")], indirect=True) def test_copy_preserve_etag_across_buckets(remote, dvc): - s3 = remote.fs.s3 - s3.Bucket("another").create() + s3 = remote.fs + s3.fs.mkdir("another/") - another = S3FileSystem(dvc, {"url": "s3://another", "region": "us-east-1"}) + another = S3FileSystem( + dvc, {**remote.fs.config, "url": "s3://another", "region": "us-east-1"} + ) from_info = remote.fs.path_info / "foo" to_info = another.path_info / "foo" @@ -102,29 +104,6 @@ def test_copy_preserve_etag_across_buckets(remote, dvc): assert from_hash == to_hash -@pytest.mark.parametrize( - "remote", - [ - pytest.lazy_fixture("s3"), - pytest.param( - pytest.lazy_fixture("gs"), - marks=pytest.mark.xfail( - reason="https://github.com/iterative/dvc/issues/5521" - ), - ), - ], - indirect=True, -) -def test_makedirs(remote): - fs = remote.fs - empty_dir = remote.fs.path_info / "empty_dir" / "" - fs.remove(empty_dir) - assert not fs.exists(empty_dir) - fs.makedirs(empty_dir) - assert fs.exists(empty_dir) - assert fs.isdir(empty_dir) - - @pytest.mark.parametrize("remote", remotes, indirect=True) def test_isfile(remote): test_cases = [ From f5d753b732c922aca2318d2eed40d0fd5849594f Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Thu, 18 Mar 2021 11:10:41 +0300 Subject: [PATCH 06/30] setup: depend on s3fs --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f187675fdd..e4e1fbb57c 100644 --- a/setup.py +++ b/setup.py @@ -96,7 +96,7 @@ def run(self): gs = ["gcsfs>=0.7.2"] gdrive = ["pydrive2>=1.8.1", "six >= 1.13.0"] -s3 = ["boto3>=1.9.201"] +s3 = ["s3fs>=0.5.2"] azure = ["adlfs>=0.6.3", "azure-identity>=1.4.0", "knack"] # https://github.com/Legrandin/pycryptodome/issues/465 oss = ["oss2==2.6.1", "pycryptodome<3.9.9"] From c193dcf1813a6666a3396d8188af02cc5deecf87 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Thu, 18 Mar 2021 13:20:54 +0300 Subject: [PATCH 07/30] tests: normalize s3 tests with some cleanups --- tests/unit/fs/test_s3.py | 11 +++++++---- tests/unit/remote/test_remote.py | 9 +++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/unit/fs/test_s3.py b/tests/unit/fs/test_s3.py index 34f7f6bcbf..d66e60ee11 100644 --- a/tests/unit/fs/test_s3.py +++ b/tests/unit/fs/test_s3.py @@ -104,11 +104,14 @@ def test_s3_aws_config(tmp_dir, dvc, s3, monkeypatch): var = "USERPROFILE" else: var = "HOME" - monkeypatch.setenv(var, str(tmp_dir)) - # Fresh import to see the effects of changing HOME variable - s3_mod = importlib.reload(sys.modules[S3FileSystem.__module__]) - fs = s3_mod.S3FileSystem(dvc, s3.config) + with monkeypatch.context() as m: + m.setenv(var, str(tmp_dir)) + # Fresh import to see the effects of changing HOME variable + s3_mod = importlib.reload(sys.modules[S3FileSystem.__module__]) + fs = s3_mod.S3FileSystem(dvc, s3.config) + + importlib.reload(sys.modules[S3FileSystem.__module__]) s3_config = fs.login_info["config_kwargs"]["s3"] assert s3_config["use_accelerate_endpoint"] diff --git a/tests/unit/remote/test_remote.py b/tests/unit/remote/test_remote.py index e5a961ef95..c775115353 100644 --- a/tests/unit/remote/test_remote.py +++ b/tests/unit/remote/test_remote.py @@ -42,15 +42,12 @@ def test_remote_without_hash_jobs_default(dvc): assert fs.hash_jobs == fs.HASH_JOBS -@pytest.mark.parametrize( - "fs_name, fs_cls", [("fs", GSFileSystem), ("s3", S3FileSystem)] -) -def test_makedirs_not_create_for_top_level_path(fs_name, fs_cls, dvc, mocker): +@pytest.mark.parametrize("fs_cls", [GSFileSystem, S3FileSystem]) +def test_makedirs_not_create_for_top_level_path(fs_cls, dvc, mocker): url = f"{fs_cls.scheme}://bucket/" fs = fs_cls(dvc, {"url": url}) mocked_client = mocker.PropertyMock() - # we use remote clients with same name as scheme to interact with remote - mocker.patch.object(fs_cls, fs_name, mocked_client) + mocker.patch.object(fs_cls, "fs", mocked_client) fs.makedirs(fs.path_info) assert not mocked_client.called From 130bdd790b7bbe8c4e5decf9aa44fd6734cc231f Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 23 Mar 2021 12:33:42 +0300 Subject: [PATCH 08/30] Use _download/_upload with old implementation --- dvc/fs/s3.py | 103 +++++++++++++++++++++++++++++++++++++++++++++++++- dvc/logger.py | 1 + 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index 7e9473e60e..b24f910f21 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -1,18 +1,22 @@ import os import threading from collections import defaultdict +from contextlib import contextmanager from funcy import cached_property, wrap_prop +from dvc.exceptions import DvcException from dvc.path_info import CloudURLInfo +from dvc.progress import Tqdm from dvc.scheme import Schemes +from dvc.utils import error_link from .fsspec_wrapper import FSSpecWrapper _AWS_CONFIG_PATH = os.path.join(os.path.expanduser("~"), ".aws", "config") -class S3FileSystem(FSSpecWrapper): +class BaseS3FileSystem(FSSpecWrapper): scheme = Schemes.S3 PATH_CLS = CloudURLInfo REQUIRES = {"s3fs": "s3fs"} @@ -134,3 +138,100 @@ def open( return self.fs.open( self._with_bucket(path_info), mode=mode, **self._open_args ) + + +class S3FileSystem(BaseS3FileSystem): + @wrap_prop(threading.Lock()) + @cached_property + def s3(self): + import boto3 + + login_info = self.login_info + client_kwargs = login_info.get("client_kwargs", {}) + session_opts = { + "profile_name": login_info.get("profile"), + "region_name": client_kwargs.get("region_name"), + } + + if "key" in login_info: + session_opts["aws_access_key_id"] = login_info["key"] + if "secret" in login_info: + session_opts["aws_secret_access_key"] = login_info["secret"] + if "token" in login_info: + session_opts["aws_session_token"] = login_info["token"] + + session = boto3.session.Session(**session_opts) + # pylint: disable=attribute-defined-outside-init + self.endpoint_url = client_kwargs.get("endpoint_url") + return session.resource( + "s3", + endpoint_url=self.endpoint_url, + use_ssl=login_info["use_ssl"], + ) + + @contextmanager + def _get_s3(self): + from botocore.exceptions import ( + EndpointConnectionError, + NoCredentialsError, + ) + + try: + yield self.s3 + except NoCredentialsError as exc: + link = error_link("no-credentials") + raise DvcException( + f"Unable to find AWS credentials. {link}" + ) from exc + except EndpointConnectionError as exc: + link = error_link("connection-error") + name = self.endpoint_url or "AWS S3" + raise DvcException( + f"Unable to connect to '{name}'. {link}" + ) from exc + + @contextmanager + def _get_bucket(self, bucket): + with self._get_s3() as s3: + try: + yield s3.Bucket(bucket) + except s3.meta.client.exceptions.NoSuchBucket as exc: + link = error_link("no-bucket") + raise DvcException( + f"Bucket '{bucket}' does not exist. {link}" + ) from exc + + @contextmanager + def _get_obj(self, path_info): + with self._get_bucket(path_info.bucket) as bucket: + try: + yield bucket.Object(path_info.path) + except bucket.meta.client.exceptions.NoSuchKey as exc: + raise DvcException(f"{path_info.url} does not exist") from exc + + def _upload( + self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs + ): + with self._get_obj(to_info) as obj: + total = os.path.getsize(from_file) + with Tqdm( + disable=no_progress_bar, total=total, bytes=True, desc=name + ) as pbar: + obj.upload_file( + from_file, + Callback=pbar.update, + ExtraArgs=self.login_info.get("s3_additional_kwargs"), + ) + + def _download( + self, from_info, to_file, name=None, no_progress_bar=False, **pbar_args + ): + with self._get_obj(from_info) as obj: + with Tqdm( + disable=no_progress_bar, + total=obj.content_length, + bytes=True, + desc=name, + **pbar_args, + ) as pbar: + obj.download_file(to_file, Callback=pbar.update) diff --git a/dvc/logger.py b/dvc/logger.py index 396fb242e2..e2eb5cd8f8 100644 --- a/dvc/logger.py +++ b/dvc/logger.py @@ -196,6 +196,7 @@ def disable_other_loggers(): def setup(level=logging.INFO): colorama.init() + logging.getLogger("asyncio").setLevel(logging.CRITICAL) addLoggingLevel("TRACE", logging.DEBUG - 5) logging.config.dictConfig( From 8073f2021ee8c3fadc7420776d67ec4070333c90 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 23 Mar 2021 15:06:35 +0300 Subject: [PATCH 09/30] fsspec: cache .exists() results --- dvc/fs/fsspec_wrapper.py | 6 ++++-- dvc/fs/s3.py | 8 +------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/dvc/fs/fsspec_wrapper.py b/dvc/fs/fsspec_wrapper.py index 0470b5da62..a88ef183c3 100644 --- a/dvc/fs/fsspec_wrapper.py +++ b/dvc/fs/fsspec_wrapper.py @@ -91,7 +91,9 @@ def copy(self, from_info, to_info): self.fs.copy(self._with_bucket(from_info), self._with_bucket(to_info)) def exists(self, path_info, use_dvcignore=False): - return self.fs.exists(self._with_bucket(path_info)) + return self._with_bucket(path_info) in self.fs.ls( + self._with_bucket(path_info.parent) + ) def ls( self, path_info, detail=False, recursive=False @@ -107,7 +109,7 @@ def ls( yield from self._strip_buckets(files, detail, prefix=root) return None - yield from self._strip_buckets(self.ls(path, detail=detail), detail) + yield from self._strip_buckets(self.fs.ls(path, detail=detail), detail) def walk_files(self, path_info, **kwargs): for file in self.ls(path_info, recursive=True): diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index b24f910f21..e35bff977d 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -132,13 +132,6 @@ def fs(self): return _S3FileSystem(**self.login_info, skip_instance_cache=True) - def open( - self, path_info, mode="r", **kwargs - ): # pylint: disable=arguments-differ - return self.fs.open( - self._with_bucket(path_info), mode=mode, **self._open_args - ) - class S3FileSystem(BaseS3FileSystem): @wrap_prop(threading.Lock()) @@ -222,6 +215,7 @@ def _upload( Callback=pbar.update, ExtraArgs=self.login_info.get("s3_additional_kwargs"), ) + self.fs.invalidate_cache(self._with_bucket(to_info.parent)) def _download( self, from_info, to_file, name=None, no_progress_bar=False, **pbar_args From b1963a0bcc891d10a0b959b3b13d8d7988009642 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 23 Mar 2021 15:15:07 +0300 Subject: [PATCH 10/30] setup: pin the boto version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e4e1fbb57c..a3ac1c979a 100644 --- a/setup.py +++ b/setup.py @@ -96,7 +96,7 @@ def run(self): gs = ["gcsfs>=0.7.2"] gdrive = ["pydrive2>=1.8.1", "six >= 1.13.0"] -s3 = ["s3fs>=0.5.2"] +s3 = ["s3fs>=0.5.2", "boto3>=1.9.2012"] azure = ["adlfs>=0.6.3", "azure-identity>=1.4.0", "knack"] # https://github.com/Legrandin/pycryptodome/issues/465 oss = ["oss2==2.6.1", "pycryptodome<3.9.9"] From 28e60c7c048dffd0ce127da94bd154c410a775c7 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 23 Mar 2021 15:17:54 +0300 Subject: [PATCH 11/30] setup: install s3fs[boto3] --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a3ac1c979a..1d9de3898d 100644 --- a/setup.py +++ b/setup.py @@ -96,7 +96,7 @@ def run(self): gs = ["gcsfs>=0.7.2"] gdrive = ["pydrive2>=1.8.1", "six >= 1.13.0"] -s3 = ["s3fs>=0.5.2", "boto3>=1.9.2012"] +s3 = ["s3fs['boto3']>=0.5.2"] azure = ["adlfs>=0.6.3", "azure-identity>=1.4.0", "knack"] # https://github.com/Legrandin/pycryptodome/issues/465 oss = ["oss2==2.6.1", "pycryptodome<3.9.9"] From d13cb6a81fbf2f46f25d8c7a904eda784a5c251d Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 10:52:47 +0300 Subject: [PATCH 12/30] s3: proper exception management --- dvc/fs/s3.py | 123 ++++++++++++++++++++++----------------------------- 1 file changed, 54 insertions(+), 69 deletions(-) diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index e35bff977d..9482cda3da 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -1,15 +1,12 @@ import os import threading from collections import defaultdict -from contextlib import contextmanager from funcy import cached_property, wrap_prop -from dvc.exceptions import DvcException from dvc.path_info import CloudURLInfo from dvc.progress import Tqdm from dvc.scheme import Schemes -from dvc.utils import error_link from .fsspec_wrapper import FSSpecWrapper @@ -37,23 +34,38 @@ def __init__(self, repo, config): self.path_info = self.PATH_CLS(url) self._open_args = {} + self._transfer_config = None self.login_info = self._prepare_credentials(config) + _TRANSFER_CONFIG_ALIASES = { + "max_queue_size": "max_io_queue", + "max_concurrent_requests": "max_concurrency", + "multipart_threshold": "multipart_threshold", + "multipart_chunksize": "multipart_chunksize", + } + def _split_s3_config(self, s3_config): """Splits the general s3 config into 2 different config objects, one for transfer.TransferConfig and other is the general session config""" + + from boto3.s3.transfer import TransferConfig + from dvc.utils import conversions - config = {} + config, transfer_config = {}, {} for key, value in s3_config.items(): - if key in {"multipart_chunksize", "multipart_threshold"}: - self._open_args[ - "block_size" - ] = conversions.human_readable_to_bytes(value) + if key in self._TRANSFER_CONFIG_ALIASES: + if key in {"multipart_chunksize", "multipart_threshold"}: + # cast human readable sizes (like 24MiB) to integers + value = conversions.human_readable_to_bytes(value) + else: + value = int(value) + transfer_config[self._TRANSFER_CONFIG_ALIASES[key]] = value else: config[key] = value + self._transfer_config = TransferConfig(**transfer_config) return config def _load_aws_config_file(self, profile): @@ -154,78 +166,51 @@ def s3(self): session_opts["aws_session_token"] = login_info["token"] session = boto3.session.Session(**session_opts) - # pylint: disable=attribute-defined-outside-init - self.endpoint_url = client_kwargs.get("endpoint_url") + return session.resource( "s3", - endpoint_url=self.endpoint_url, + endpoint_url=client_kwargs.get("endpoint_url"), use_ssl=login_info["use_ssl"], ) - @contextmanager - def _get_s3(self): - from botocore.exceptions import ( - EndpointConnectionError, - NoCredentialsError, - ) - - try: - yield self.s3 - except NoCredentialsError as exc: - link = error_link("no-credentials") - raise DvcException( - f"Unable to find AWS credentials. {link}" - ) from exc - except EndpointConnectionError as exc: - link = error_link("connection-error") - name = self.endpoint_url or "AWS S3" - raise DvcException( - f"Unable to connect to '{name}'. {link}" - ) from exc - - @contextmanager - def _get_bucket(self, bucket): - with self._get_s3() as s3: - try: - yield s3.Bucket(bucket) - except s3.meta.client.exceptions.NoSuchBucket as exc: - link = error_link("no-bucket") - raise DvcException( - f"Bucket '{bucket}' does not exist. {link}" - ) from exc - - @contextmanager def _get_obj(self, path_info): - with self._get_bucket(path_info.bucket) as bucket: - try: - yield bucket.Object(path_info.path) - except bucket.meta.client.exceptions.NoSuchKey as exc: - raise DvcException(f"{path_info.url} does not exist") from exc + try: + bucket = self.s3.Bucket(path_info.bucket) + return bucket.Object(path_info.path) + except Exception as exc: + from s3fs.errors import translate_boto_error + + raise translate_boto_error(exc) def _upload( - self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs + self, from_file, to_info, name=None, no_progress_bar=False, **pbar_args ): - with self._get_obj(to_info) as obj: - total = os.path.getsize(from_file) - with Tqdm( - disable=no_progress_bar, total=total, bytes=True, desc=name - ) as pbar: - obj.upload_file( - from_file, - Callback=pbar.update, - ExtraArgs=self.login_info.get("s3_additional_kwargs"), - ) + total = os.path.getsize(from_file) + with Tqdm( + disable=no_progress_bar, + total=total, + bytes=True, + desc=name, + **pbar_args, + ) as pbar: + obj = self._get_obj(to_info) + obj.upload_file( + from_file, + Callback=pbar.update, + ExtraArgs=self.login_info.get("s3_additional_kwargs"), + Config=self._transfer_config, + ) self.fs.invalidate_cache(self._with_bucket(to_info.parent)) def _download( self, from_info, to_file, name=None, no_progress_bar=False, **pbar_args ): - with self._get_obj(from_info) as obj: - with Tqdm( - disable=no_progress_bar, - total=obj.content_length, - bytes=True, - desc=name, - **pbar_args, - ) as pbar: - obj.download_file(to_file, Callback=pbar.update) + obj = self._get_obj(from_info) + with Tqdm( + disable=no_progress_bar, + total=obj.content_length, + bytes=True, + desc=name, + **pbar_args, + ) as pbar: + obj.download_file(to_file, Callback=pbar.update) From f46114802d05ea8e74785e8fb44839fc5084bf1c Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 11:18:54 +0300 Subject: [PATCH 13/30] s3: config improvements --- dvc/fs/s3.py | 13 ++++++------- tests/unit/fs/test_s3.py | 30 ++++++++++++++++++++---------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index 9482cda3da..aba374c149 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -33,10 +33,6 @@ def __init__(self, repo, config): url = config.get("url", "s3://") self.path_info = self.PATH_CLS(url) - self._open_args = {} - self._transfer_config = None - self.login_info = self._prepare_credentials(config) - _TRANSFER_CONFIG_ALIASES = { "max_queue_size": "max_io_queue", "max_concurrent_requests": "max_concurrency", @@ -65,12 +61,15 @@ def _split_s3_config(self, s3_config): else: config[key] = value + # pylint: disable=attribute-defined-outside-init self._transfer_config = TransferConfig(**transfer_config) return config def _load_aws_config_file(self, profile): from botocore.configloader import load_config + # pylint: disable=attribute-defined-outside-init + self._transfer_config = None config_path = os.environ.get("AWS_CONFIG_FILE", _AWS_CONFIG_PATH) if not os.path.exists(config_path): return None @@ -142,7 +141,7 @@ def _entry_hook(self, entry): def fs(self): from s3fs import S3FileSystem as _S3FileSystem - return _S3FileSystem(**self.login_info, skip_instance_cache=True) + return _S3FileSystem(**self.fs_args) class S3FileSystem(BaseS3FileSystem): @@ -151,7 +150,7 @@ class S3FileSystem(BaseS3FileSystem): def s3(self): import boto3 - login_info = self.login_info + login_info = self.fs_args client_kwargs = login_info.get("client_kwargs", {}) session_opts = { "profile_name": login_info.get("profile"), @@ -197,7 +196,7 @@ def _upload( obj.upload_file( from_file, Callback=pbar.update, - ExtraArgs=self.login_info.get("s3_additional_kwargs"), + ExtraArgs=self.fs_args.get("s3_additional_kwargs"), Config=self._transfer_config, ) self.fs.invalidate_cache(self._with_bucket(to_info.parent)) diff --git a/tests/unit/fs/test_s3.py b/tests/unit/fs/test_s3.py index d66e60ee11..742cb3cab6 100644 --- a/tests/unit/fs/test_s3.py +++ b/tests/unit/fs/test_s3.py @@ -42,7 +42,7 @@ def test_grants(dvc): } fs = S3FileSystem(dvc, config) - extra_args = fs.login_info["s3_additional_kwargs"] + extra_args = fs.fs_args["s3_additional_kwargs"] assert ( extra_args["GrantRead"] == "id=read-permission-id,id=other-read-permission-id" @@ -62,7 +62,7 @@ def test_grants_mutually_exclusive_acl_error(dvc, grants): def test_sse_kms_key_id(dvc): fs = S3FileSystem(dvc, {"url": url, "sse_kms_key_id": "key"}) - assert fs.login_info["s3_additional_kwargs"]["SSEKMSKeyId"] == "key" + assert fs.fs_args["s3_additional_kwargs"]["SSEKMSKeyId"] == "key" def test_key_id_and_secret(dvc): @@ -75,9 +75,9 @@ def test_key_id_and_secret(dvc): "session_token": session_token, }, ) - assert fs.login_info["key"] == key_id - assert fs.login_info["secret"] == key_secret - assert fs.login_info["token"] == session_token + assert fs.fs_args["key"] == key_id + assert fs.fs_args["secret"] == key_secret + assert fs.fs_args["token"] == session_token KB = 1024 @@ -93,6 +93,9 @@ def test_s3_aws_config(tmp_dir, dvc, s3, monkeypatch): """\ [default] s3 = + max_concurrent_requests = 20000 + max_queue_size = 1000 + multipart_threshold = 1000KiB multipart_chunksize = 64MB use_accelerate_endpoint = true addressing_style = path @@ -113,10 +116,15 @@ def test_s3_aws_config(tmp_dir, dvc, s3, monkeypatch): importlib.reload(sys.modules[S3FileSystem.__module__]) - s3_config = fs.login_info["config_kwargs"]["s3"] + s3_config = fs.fs_args["config_kwargs"]["s3"] assert s3_config["use_accelerate_endpoint"] assert s3_config["addressing_style"] == "path" - assert fs._open_args["block_size"] == 64 * MB + + transfer_config = fs._transfer_config + assert transfer_config.max_io_queue_size == 1000 + assert transfer_config.multipart_chunksize == 64 * MB + assert transfer_config.multipart_threshold == 1000 * KB + assert transfer_config.max_request_concurrency == 20000 def test_s3_aws_config_different_profile(tmp_dir, dvc, s3, monkeypatch): @@ -141,7 +149,9 @@ def test_s3_aws_config_different_profile(tmp_dir, dvc, s3, monkeypatch): monkeypatch.setenv("AWS_CONFIG_FILE", config_file) fs = S3FileSystem(dvc, {**s3.config, "profile": "dev"}) - s3_config = fs.login_info["config_kwargs"]["s3"] - assert "use_accelerate_endpoint" not in s3_config + + s3_config = fs.fs_args["config_kwargs"]["s3"] assert s3_config["addressing_style"] == "virtual" - assert fs._open_args["block_size"] == 2 * GB + + transfer_config = fs._transfer_config + assert transfer_config.multipart_threshold == 2 * GB From 2e55ad6385d8ed68ead5c6d399bbd89b503d3549 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 11:22:59 +0300 Subject: [PATCH 14/30] setup: reference without quotes --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 1d9de3898d..edb50aaee4 100644 --- a/setup.py +++ b/setup.py @@ -96,7 +96,7 @@ def run(self): gs = ["gcsfs>=0.7.2"] gdrive = ["pydrive2>=1.8.1", "six >= 1.13.0"] -s3 = ["s3fs['boto3']>=0.5.2"] +s3 = ["s3fs[boto3]>=0.5.2"] azure = ["adlfs>=0.6.3", "azure-identity>=1.4.0", "knack"] # https://github.com/Legrandin/pycryptodome/issues/465 oss = ["oss2==2.6.1", "pycryptodome<3.9.9"] From a3381db75604bed655d0a30bad58d137c6577775 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 11:33:54 +0300 Subject: [PATCH 15/30] setup: install boto separately --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index edb50aaee4..e06a442c8f 100644 --- a/setup.py +++ b/setup.py @@ -96,7 +96,7 @@ def run(self): gs = ["gcsfs>=0.7.2"] gdrive = ["pydrive2>=1.8.1", "six >= 1.13.0"] -s3 = ["s3fs[boto3]>=0.5.2"] +s3 = ["s3fs>=0.5.2", "boto3==1.16.52"] azure = ["adlfs>=0.6.3", "azure-identity>=1.4.0", "knack"] # https://github.com/Legrandin/pycryptodome/issues/465 oss = ["oss2==2.6.1", "pycryptodome<3.9.9"] From 4ebe85baebd8cbb2fc502917fd9d534d853d023e Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 11:46:52 +0300 Subject: [PATCH 16/30] setup: install moto server --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e06a442c8f..c5a17f80f8 100644 --- a/setup.py +++ b/setup.py @@ -135,7 +135,7 @@ def run(self): "pydocstyle<4.0", "jaraco.windows==3.9.2", "mock-ssh-server>=0.8.2", - "moto==1.3.16.dev122", + "moto[server]==1.3.16.dev122", # moto's indirect dependency that is causing problems with flufl.lock's # dependency (atpublic). See https://github.com/iterative/dvc/pull/4853 "aws-sam-translator<1.29.0", From 1664d75e2b45c2bfe4908ff5634d91bcbe920fef Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 13:06:55 +0300 Subject: [PATCH 17/30] s3: mocking adjustments for moto_server --- tests/remotes/__init__.py | 9 ++++++++- tests/remotes/s3.py | 27 ++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/tests/remotes/__init__.py b/tests/remotes/__init__.py index d91692d63c..a901bb781f 100644 --- a/tests/remotes/__init__.py +++ b/tests/remotes/__init__.py @@ -24,7 +24,14 @@ oss_server, real_oss, ) -from .s3 import S3, TEST_AWS_REPO_BUCKET, real_s3, s3, s3_server # noqa: F401 +from .s3 import ( # noqa: F401 + S3, + TEST_AWS_REPO_BUCKET, + real_s3, + s3, + s3_fake_creds_file, + s3_server, +) from .ssh import ( # noqa: F401; noqa: F401 SSHMocked, ssh, diff --git a/tests/remotes/s3.py b/tests/remotes/s3.py index b38a2ee79d..9505856697 100644 --- a/tests/remotes/s3.py +++ b/tests/remotes/s3.py @@ -101,6 +101,31 @@ def read_text(self, encoding=None, errors=None): return self.read_bytes().decode(encoding) +@pytest.fixture +def s3_fake_creds_file(monkeypatch): + # https://github.com/spulec/moto#other-caveats + import pathlib + + aws_dir = pathlib.Path("~").expanduser() / ".aws" + aws_dir.mkdir(exist_ok=True) + + aws_creds = aws_dir / "credentials" + exists = aws_creds.exists() + + if not exists: + aws_creds.touch() + + with monkeypatch.context() as m: + m.setenv("AWS_ACCESS_KEY_ID", "testing") + m.setenv("AWS_SECRET_ACCESS_KEY", "testing") + m.setenv("AWS_SECURITY_TOKEN", "testing") + m.setenv("AWS_SESSION_TOKEN", "testing") + yield + + if not exists: + aws_creds.unlink() + + # Due to moto being uncompatible with aioboto on wrapper # mode, we start the moto server as a subprocess (imitates # a real S3 service) and then create a client that uses @@ -139,7 +164,7 @@ def s3_server(test_config): @pytest.fixture -def s3(s3_server): +def s3(s3_server, s3_fake_creds_file): workspace = S3(S3.get_url()) workspace._s3.create_bucket(Bucket=TEST_AWS_REPO_BUCKET) yield workspace From 75318254652d15231705f6b4e89585a3d970a950 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 13:20:24 +0300 Subject: [PATCH 18/30] s3: mark some fixtures with scope=session --- tests/remotes/s3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/remotes/s3.py b/tests/remotes/s3.py index 9505856697..9396a3f6ff 100644 --- a/tests/remotes/s3.py +++ b/tests/remotes/s3.py @@ -101,7 +101,7 @@ def read_text(self, encoding=None, errors=None): return self.read_bytes().decode(encoding) -@pytest.fixture +@pytest.fixture(scope="session") def s3_fake_creds_file(monkeypatch): # https://github.com/spulec/moto#other-caveats import pathlib @@ -133,7 +133,7 @@ def s3_fake_creds_file(monkeypatch): # # Originally adopted from: # https://github.com/dask/s3fs/blob/main/s3fs/tests/test_s3fs.py#L66-L86 -@pytest.fixture +@pytest.fixture(scope="session") def s3_server(test_config): test_config.requires("s3") From c29b06fd91102fb2dbdbdd149aea0f3688f15420 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 13:52:01 +0300 Subject: [PATCH 19/30] s3: s3_fake_creds_file scopes to func --- tests/remotes/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/remotes/s3.py b/tests/remotes/s3.py index 9396a3f6ff..c8e60afd7a 100644 --- a/tests/remotes/s3.py +++ b/tests/remotes/s3.py @@ -101,7 +101,7 @@ def read_text(self, encoding=None, errors=None): return self.read_bytes().decode(encoding) -@pytest.fixture(scope="session") +@pytest.fixture def s3_fake_creds_file(monkeypatch): # https://github.com/spulec/moto#other-caveats import pathlib From 6288f54e1d6719cd3f6af6af5c1e10a70dee12ef Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 15:08:29 +0300 Subject: [PATCH 20/30] fsspec: add some docs regarding exists() optimization --- dvc/fs/fsspec_wrapper.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/dvc/fs/fsspec_wrapper.py b/dvc/fs/fsspec_wrapper.py index a88ef183c3..b6a062c6dc 100644 --- a/dvc/fs/fsspec_wrapper.py +++ b/dvc/fs/fsspec_wrapper.py @@ -91,9 +91,17 @@ def copy(self, from_info, to_info): self.fs.copy(self._with_bucket(from_info), self._with_bucket(to_info)) def exists(self, path_info, use_dvcignore=False): - return self._with_bucket(path_info) in self.fs.ls( - self._with_bucket(path_info.parent) - ) + # Some implementations don't use ls under the hood + # which is not an efficient behavior on DVC considering that + # we usually to a lot of stat calls. For increasing the cache + # construction, we will first try to see whether that directory + # exists by doing an ls() on it's parent, and if that fails we + # would fall back to the original implementation. + raw_path = self._with_bucket(path_info) + try: + return raw_path in self.fs.ls(self._with_bucket(path_info.parent)) + except OSError: + return self.fs.exists(raw_path) def ls( self, path_info, detail=False, recursive=False From f0451ecaf1d7266894590a8182c0a8071ea087a6 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 24 Mar 2021 15:29:55 +0300 Subject: [PATCH 21/30] fsspec: exists() fallback --- dvc/fs/fsspec_wrapper.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dvc/fs/fsspec_wrapper.py b/dvc/fs/fsspec_wrapper.py index b6a062c6dc..ee3f31ccac 100644 --- a/dvc/fs/fsspec_wrapper.py +++ b/dvc/fs/fsspec_wrapper.py @@ -92,11 +92,11 @@ def copy(self, from_info, to_info): def exists(self, path_info, use_dvcignore=False): # Some implementations don't use ls under the hood - # which is not an efficient behavior on DVC considering that - # we usually to a lot of stat calls. For increasing the cache - # construction, we will first try to see whether that directory - # exists by doing an ls() on it's parent, and if that fails we - # would fall back to the original implementation. + # which is not an efficient behavior when considering that DVC + # does a lot of existence checks. For increasing the efficiency + # we will first try to infer the existence by looking at the parent + # directory's contents, and if that fails we will fallback to using + # normal exists() call. raw_path = self._with_bucket(path_info) try: return raw_path in self.fs.ls(self._with_bucket(path_info.parent)) From 5d2c3c0e07799fd14feffdea9f49d3c92577b84e Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Thu, 25 Mar 2021 12:18:12 +0300 Subject: [PATCH 22/30] fsspec: revert exists() optimization --- dvc/fs/fsspec_wrapper.py | 14 ++------------ dvc/fs/s3.py | 25 ++++++++++++++++++------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/dvc/fs/fsspec_wrapper.py b/dvc/fs/fsspec_wrapper.py index ee3f31ccac..0470b5da62 100644 --- a/dvc/fs/fsspec_wrapper.py +++ b/dvc/fs/fsspec_wrapper.py @@ -91,17 +91,7 @@ def copy(self, from_info, to_info): self.fs.copy(self._with_bucket(from_info), self._with_bucket(to_info)) def exists(self, path_info, use_dvcignore=False): - # Some implementations don't use ls under the hood - # which is not an efficient behavior when considering that DVC - # does a lot of existence checks. For increasing the efficiency - # we will first try to infer the existence by looking at the parent - # directory's contents, and if that fails we will fallback to using - # normal exists() call. - raw_path = self._with_bucket(path_info) - try: - return raw_path in self.fs.ls(self._with_bucket(path_info.parent)) - except OSError: - return self.fs.exists(raw_path) + return self.fs.exists(self._with_bucket(path_info)) def ls( self, path_info, detail=False, recursive=False @@ -117,7 +107,7 @@ def ls( yield from self._strip_buckets(files, detail, prefix=root) return None - yield from self._strip_buckets(self.fs.ls(path, detail=detail), detail) + yield from self._strip_buckets(self.ls(path, detail=detail), detail) def walk_files(self, path_info, **kwargs): for file in self.ls(path_info, recursive=True): diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index aba374c149..d020a1678e 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -1,3 +1,4 @@ +import functools import os import threading from collections import defaultdict @@ -144,6 +145,19 @@ def fs(self): return _S3FileSystem(**self.fs_args) +def _translate_exceptions(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + try: + func(*args, **kwargs) + except Exception as exc: + from s3fs.errors import translate_boto_error + + raise translate_boto_error(exc) + + return wrapper + + class S3FileSystem(BaseS3FileSystem): @wrap_prop(threading.Lock()) @cached_property @@ -173,14 +187,10 @@ def s3(self): ) def _get_obj(self, path_info): - try: - bucket = self.s3.Bucket(path_info.bucket) - return bucket.Object(path_info.path) - except Exception as exc: - from s3fs.errors import translate_boto_error - - raise translate_boto_error(exc) + bucket = self.s3.Bucket(path_info.bucket) + return bucket.Object(path_info.path) + @_translate_exceptions def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **pbar_args ): @@ -201,6 +211,7 @@ def _upload( ) self.fs.invalidate_cache(self._with_bucket(to_info.parent)) + @_translate_exceptions def _download( self, from_info, to_file, name=None, no_progress_bar=False, **pbar_args ): From e1f4032cf758b8e883d3117068e2da35d184e3b0 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Fri, 2 Apr 2021 11:49:08 +0300 Subject: [PATCH 23/30] tests: ssl_verify => verify --- tests/unit/fs/test_s3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/fs/test_s3.py b/tests/unit/fs/test_s3.py index 64d1045014..938c239b0b 100644 --- a/tests/unit/fs/test_s3.py +++ b/tests/unit/fs/test_s3.py @@ -38,14 +38,14 @@ def test_verify_ssl_default_param(dvc): } fs = S3FileSystem(dvc, config) - assert fs.fs_args["client_kwargs"]["ssl_verify"] + assert fs.fs_args["client_kwargs"]["verify"] def test_ssl_verify_bool_param(dvc): config = {"url": url, "ssl_verify": False} fs = S3FileSystem(dvc, config) - assert fs.fs_args["client_kwargs"]["ssl_verify"] == config["ssl_verify"] + assert fs.fs_args["client_kwargs"]["verify"] == config["ssl_verify"] def test_grants(dvc): From 8ac98ac57112295f044c13053445fc7b31a8a556 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 7 Apr 2021 16:39:27 +0300 Subject: [PATCH 24/30] setup: temporarily fetch from master --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index b4623f734f..bf60bebda5 100644 --- a/setup.py +++ b/setup.py @@ -97,7 +97,8 @@ def run(self): gs = ["gcsfs>=0.7.2"] gdrive = ["pydrive2>=1.8.1", "six >= 1.13.0"] -s3 = ["s3fs>=0.5.2", "boto3==1.16.52"] +# temporary dependency to fetch from master +s3 = ["https://github.com/dask/s3fs.git", "boto3==1.16.52"] azure = ["adlfs>=0.6.3", "azure-identity>=1.4.0", "knack"] # https://github.com/Legrandin/pycryptodome/issues/465 oss = ["oss2==2.6.1", "pycryptodome<3.9.9"] From 82821cfbf8992ec12dcac4574b64c818bc71d811 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 12 Apr 2021 10:58:00 +0300 Subject: [PATCH 25/30] s3: add boto as a requirement --- dvc/fs/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/fs/s3.py b/dvc/fs/s3.py index 3131849ae0..b5cc9bf69d 100644 --- a/dvc/fs/s3.py +++ b/dvc/fs/s3.py @@ -17,7 +17,7 @@ class BaseS3FileSystem(FSSpecWrapper): scheme = Schemes.S3 PATH_CLS = CloudURLInfo - REQUIRES = {"s3fs": "s3fs"} + REQUIRES = {"s3fs": "s3fs", "boto3": "boto3"} PARAM_CHECKSUM = "etag" DETAIL_FIELDS = frozenset(("etag", "size")) From dbfd44f3dadad5afd1c28cf2020cf185bf82270f Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 12 Apr 2021 11:09:14 +0300 Subject: [PATCH 26/30] tests: do an extra check regarding race conditions --- tests/remotes/s3.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/remotes/s3.py b/tests/remotes/s3.py index c8e60afd7a..348f3ed597 100644 --- a/tests/remotes/s3.py +++ b/tests/remotes/s3.py @@ -110,9 +110,9 @@ def s3_fake_creds_file(monkeypatch): aws_dir.mkdir(exist_ok=True) aws_creds = aws_dir / "credentials" - exists = aws_creds.exists() + initially_exists = aws_creds.exists() - if not exists: + if not initially_exists: aws_creds.touch() with monkeypatch.context() as m: @@ -122,7 +122,7 @@ def s3_fake_creds_file(monkeypatch): m.setenv("AWS_SESSION_TOKEN", "testing") yield - if not exists: + if aws_creds.exists() and not initially_exists: aws_creds.unlink() From 8790eb3dc0c444117c1ec2c6557689c7b1ec03fc Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 13 Apr 2021 12:10:43 +0300 Subject: [PATCH 27/30] tests: use motoserver docker image --- tests/docker-compose.yml | 4 ++++ tests/remotes/s3.py | 52 ++++++++++++++-------------------------- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 6cd84986b4..682932bf0b 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -6,6 +6,10 @@ services: command: azurite -L -l /data --blobHost 0.0.0.0 --queueHost 0.0.0.0 ports: - "10000" + motoserver: + image: motoserver/moto + ports: + - "5000:5000" oss: image: rkuprieiev/oss-emulator ports: diff --git a/tests/remotes/s3.py b/tests/remotes/s3.py index 348f3ed597..aa45efc958 100644 --- a/tests/remotes/s3.py +++ b/tests/remotes/s3.py @@ -1,6 +1,5 @@ import locale import os -import time import uuid import pytest @@ -12,18 +11,17 @@ from .base import Base TEST_AWS_REPO_BUCKET = os.environ.get("DVC_TEST_AWS_REPO_BUCKET", "dvc-temp") - -TEST_AWS_S3_PORT = 5555 -TEST_AWS_ENDPOINT_URL = f"http://127.0.0.1:{TEST_AWS_S3_PORT}/" +TEST_AWS_ENDPOINT_URL = "http://127.0.0.1:{port}/" class S3(Base, CloudURLInfo): IS_OBJECT_STORAGE = True + TEST_AWS_ENDPOINT_URL = None @cached_property def config(self): - return {"url": self.url, "endpointurl": TEST_AWS_ENDPOINT_URL} + return {"url": self.url, "endpointurl": self.TEST_AWS_ENDPOINT_URL} @staticmethod def should_test(): @@ -56,7 +54,7 @@ def get_url(): def _s3(self): import boto3 - return boto3.client("s3", endpoint_url=TEST_AWS_ENDPOINT_URL) + return boto3.client("s3", endpoint_url=self.config["endpointurl"]) def is_file(self): from botocore.exceptions import ClientError @@ -126,41 +124,27 @@ def s3_fake_creds_file(monkeypatch): aws_creds.unlink() -# Due to moto being uncompatible with aioboto on wrapper -# mode, we start the moto server as a subprocess (imitates -# a real S3 service) and then create a client that uses -# this server. -# -# Originally adopted from: -# https://github.com/dask/s3fs/blob/main/s3fs/tests/test_s3fs.py#L66-L86 @pytest.fixture(scope="session") -def s3_server(test_config): - test_config.requires("s3") - - import shlex - import subprocess - +def s3_server(test_config, docker_compose, docker_services): import requests - proc = subprocess.Popen( - shlex.split(f"moto_server s3 -p {TEST_AWS_S3_PORT}") - ) + test_config.requires("s3") + + port = docker_services.port_for("motoserver", 5000) + endpoint_url = TEST_AWS_ENDPOINT_URL.format(port=port) - timeout = 5 - while timeout > 0: + def _check(): try: - r = requests.get(TEST_AWS_ENDPOINT_URL) - if r.ok: - break + r = requests.get(endpoint_url) + return r.ok except requests.RequestException: - pass - timeout -= 0.1 - time.sleep(0.1) - - yield + return False - proc.terminate() - proc.wait() + docker_services.wait_until_responsive( + timeout=60.0, pause=0.1, check=_check + ) + S3.TEST_AWS_ENDPOINT_URL = endpoint_url + return endpoint_url @pytest.fixture From c541ddf2e010dcd7d514f3501b82e468d2fb4e13 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 13 Apr 2021 15:31:26 +0300 Subject: [PATCH 28/30] tests: wrap config deletion with try/finally --- tests/docker-compose.yml | 2 +- tests/remotes/s3.py | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 682932bf0b..22458d5280 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -9,7 +9,7 @@ services: motoserver: image: motoserver/moto ports: - - "5000:5000" + - "5000" oss: image: rkuprieiev/oss-emulator ports: diff --git a/tests/remotes/s3.py b/tests/remotes/s3.py index aa45efc958..d52cb0afe8 100644 --- a/tests/remotes/s3.py +++ b/tests/remotes/s3.py @@ -113,15 +113,16 @@ def s3_fake_creds_file(monkeypatch): if not initially_exists: aws_creds.touch() - with monkeypatch.context() as m: - m.setenv("AWS_ACCESS_KEY_ID", "testing") - m.setenv("AWS_SECRET_ACCESS_KEY", "testing") - m.setenv("AWS_SECURITY_TOKEN", "testing") - m.setenv("AWS_SESSION_TOKEN", "testing") - yield - - if aws_creds.exists() and not initially_exists: - aws_creds.unlink() + try: + with monkeypatch.context() as m: + m.setenv("AWS_ACCESS_KEY_ID", "testing") + m.setenv("AWS_SECRET_ACCESS_KEY", "testing") + m.setenv("AWS_SECURITY_TOKEN", "testing") + m.setenv("AWS_SESSION_TOKEN", "testing") + yield + finally: + if aws_creds.exists() and not initially_exists: + aws_creds.unlink() @pytest.fixture(scope="session") From 5b0dc6f332addb39e5b58621bad1e164a93ea85f Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Thu, 15 Apr 2021 12:13:22 +0300 Subject: [PATCH 29/30] tests: disable s3 by default --- tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 4a000c5b4f..908119b4f6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,7 +19,7 @@ "hdfs": False, "http": True, "oss": False, - "s3": True, + "s3": False, "ssh": True, "webdav": True, } From 36c0c59fdec084239e68180bd42cdb69fa866a66 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 4 May 2021 16:42:19 +0300 Subject: [PATCH 30/30] setup: upgrade fsspec --- setup.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 09b78000b5..8d36fcfa59 100644 --- a/setup.py +++ b/setup.py @@ -88,17 +88,16 @@ def run(self): "python-benedict>=0.21.1", "pyparsing==2.4.7", "typing_extensions>=3.7.4", - "fsspec==0.9.0", + "fsspec==2021.4.0", "diskcache>=5.2.1", ] # Extra dependencies for remote integrations -gs = ["gcsfs==0.8.0"] +gs = ["gcsfs==2021.4.0"] gdrive = ["pydrive2>=1.8.1", "six >= 1.13.0"] -# temporary dependency to fetch from master -s3 = ["s3fs @ git+https://github.com/dask/s3fs.git", "boto3==1.16.52"] +s3 = ["s3fs==2021.4.0", "boto3==1.16.52"] azure = ["adlfs==0.7.1", "azure-identity>=1.4.0", "knack"] # https://github.com/Legrandin/pycryptodome/issues/465 oss = ["oss2==2.6.1", "pycryptodome>=3.10"]