From 3e74b21c8ee0e9d63b81dbdbcb8e71401ec53652 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 16 Mar 2021 13:25:45 +0300 Subject: [PATCH 1/8] copy: implement preserve_etag mode --- s3fs/core.py | 60 ++++++++++++++++++++++++++++++++++++++--- s3fs/tests/test_s3fs.py | 29 ++++++++++++++++++++ 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 508bdcbc..59eb8c11 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -1311,6 +1311,49 @@ async def _copy_basic(self, path1, path2, **kwargs): raise ValueError("Copy failed (%r -> %r): %s" % (path1, path2, e)) from e self.invalidate_cache(path2) + async def _copy_etag_preserved(self, path1, path2, size, total_parts, **kwargs): + """Copy file between locations on S3 as multi-part while preserving + the etag (using the same part sizes for each part""" + + bucket1, key1, version1 = self.split_path(path1) + bucket2, key2, version2 = self.split_path(path2) + + mpu = await self._call_s3( + self.s3.create_multipart_upload, Bucket=bucket2, Key=key2, **kwargs + ) + + parts = [] + brange_first = 0 + for i in range(1, total_parts + 1): + part_info = await self._call_s3( + self.s3.head_object, Bucket=bucket1, Key=key1, PartNumber=i + ) + part_size = part_info["ContentLength"] + brange_last = brange_first + part_size - 1 + if brange_last > size: + brange_last = size - 1 + + part = await self._call_s3( + self.s3.upload_part_copy, + Bucket=bucket2, + Key=key2, + PartNumber=i, + UploadId=mpu["UploadId"], + CopySource=path1, + CopySourceRange="bytes=%i-%i" % (brange_first, brange_last), + ) + parts.append({"PartNumber": i, "ETag": part["CopyPartResult"]["ETag"]}) + brange_first += part_size + + await self._call_s3( + self.s3.complete_multipart_upload, + Bucket=bucket2, + Key=key2, + UploadId=mpu["UploadId"], + MultipartUpload={"Parts": parts}, + ) + self.invalidate_cache(path2) + async def _copy_managed(self, path1, path2, size, block=5 * 2 ** 30, **kwargs): """Copy file between locations on S3 as multi-part @@ -1351,14 +1394,23 @@ async def _copy_managed(self, path1, path2, size, block=5 * 2 ** 30, **kwargs): ) self.invalidate_cache(path2) - async def _cp_file(self, path1, path2, **kwargs): - gb5 = 5 * 2 ** 30 + async def _cp_file(self, path1, path2, preserve_etag=None, **kwargs): + threshold = kwargs.pop("managed_threshold", 5 * 2 ** 30) path1 = self._strip_protocol(path1) bucket, key, vers = self.split_path(path1) - size = (await self._info(path1, bucket, key, version_id=vers))["size"] - if size <= gb5: + + info = await self._info(path1, bucket, key, version_id=vers) + size = info["size"] + if size <= threshold: # simple copy allowed for <5GB await self._copy_basic(path1, path2, **kwargs) + elif preserve_etag: + # etag preserving multipart copy + _, _, parts_suffix = info["ETag"].strip('"').partition("-") + assert parts_suffix + await self._copy_etag_preserved( + path1, path2, size, total_parts=int(parts_suffix) + ) else: # serial multipart copy await self._copy_managed(path1, path2, size, **kwargs) diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index dbe14f1a..8d9a4e74 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -7,6 +7,7 @@ from concurrent.futures import ProcessPoolExecutor import io import os +import random import requests import time import sys @@ -1921,3 +1922,31 @@ def create_file(content: bytes): create_file(content2) with expect_errno(errno.EBUSY): f.read() + + +def test_s3fs_etag_preserving_multipart_copy(s3): + test_file1 = test_bucket_name + "/test/multipart-upload.txt" + test_file2 = test_bucket_name + "/test/multipart-upload-copy.txt" + + with s3.open(test_file1, "wb", block_size=5 * 2 ** 21) as stream: + for _ in range(5): + stream.write(b"b" * (stream.blocksize + random.randrange(200))) + + file_1 = s3.info(test_file1) + + s3.copy(test_file1, test_file2, managed_threshold=5 * 2 ** 20) + file_2 = s3.info(test_file2) + s3.rm(test_file2) + + # normal copy() uses a block size of 5GB + assert file_1["ETag"] != file_2["ETag"] + + s3.copy(test_file1, test_file2, managed_threshold=5 * 2 ** 20, preserve_etag=True) + file_2 = s3.info(test_file2) + s3.rm(test_file2) + + # etag preserving copy() determines each part size for the destination + # by checking out the matching part's size on the source + assert file_1["ETag"] == file_2["ETag"] + + s3.rm(test_file1) From 9c9e13e80ada8e797157a8a6364bfdb5e09306cb Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 16 Mar 2021 22:37:33 +0300 Subject: [PATCH 2/8] Use a global instead of an implicit keyword argument --- s3fs/core.py | 4 ++-- s3fs/tests/test_s3fs.py | 11 ++++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 59eb8c11..d980efec 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -36,6 +36,7 @@ def setup_logging(level=None): if "S3FS_LOGGING_LEVEL" in os.environ: setup_logging() +MANAGED_COPY_THRESHOLD = 5 * 2 ** 30 S3_RETRYABLE_ERRORS = (socket.timeout,) _VALID_FILE_MODES = {"r", "w", "a", "rb", "wb", "ab"} @@ -1395,13 +1396,12 @@ async def _copy_managed(self, path1, path2, size, block=5 * 2 ** 30, **kwargs): self.invalidate_cache(path2) async def _cp_file(self, path1, path2, preserve_etag=None, **kwargs): - threshold = kwargs.pop("managed_threshold", 5 * 2 ** 30) path1 = self._strip_protocol(path1) bucket, key, vers = self.split_path(path1) info = await self._info(path1, bucket, key, version_id=vers) size = info["size"] - if size <= threshold: + if size <= MANAGED_COPY_THRESHOLD: # simple copy allowed for <5GB await self._copy_basic(path1, path2, **kwargs) elif preserve_etag: diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index 8d9a4e74..d6be2bc4 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -15,6 +15,7 @@ import moto from itertools import chain import fsspec.core +import s3fs.core from s3fs.core import S3FileSystem from s3fs.utils import ignoring, SSEParams from botocore.exceptions import NoCredentialsError @@ -1924,7 +1925,11 @@ def create_file(content: bytes): f.read() -def test_s3fs_etag_preserving_multipart_copy(s3): +def test_s3fs_etag_preserving_multipart_copy(monkeypatch, s3): + # Set this to a lower value so that we can actually + # test this without creating giant objects in memory + monkeypatch.setattr(s3fs.core, 'MANAGED_COPY_THRESHOLD', 5 * 2 ** 20) + test_file1 = test_bucket_name + "/test/multipart-upload.txt" test_file2 = test_bucket_name + "/test/multipart-upload-copy.txt" @@ -1934,14 +1939,14 @@ def test_s3fs_etag_preserving_multipart_copy(s3): file_1 = s3.info(test_file1) - s3.copy(test_file1, test_file2, managed_threshold=5 * 2 ** 20) + s3.copy(test_file1, test_file2) file_2 = s3.info(test_file2) s3.rm(test_file2) # normal copy() uses a block size of 5GB assert file_1["ETag"] != file_2["ETag"] - s3.copy(test_file1, test_file2, managed_threshold=5 * 2 ** 20, preserve_etag=True) + s3.copy(test_file1, test_file2, preserve_etag=True) file_2 = s3.info(test_file2) s3.rm(test_file2) From a26e0447c8d753eaefcf9c9c8636cc6d929a4642 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 16 Mar 2021 22:37:46 +0300 Subject: [PATCH 3/8] lint --- s3fs/tests/test_s3fs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index d6be2bc4..e1d253d5 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -1928,8 +1928,8 @@ def create_file(content: bytes): def test_s3fs_etag_preserving_multipart_copy(monkeypatch, s3): # Set this to a lower value so that we can actually # test this without creating giant objects in memory - monkeypatch.setattr(s3fs.core, 'MANAGED_COPY_THRESHOLD', 5 * 2 ** 20) - + monkeypatch.setattr(s3fs.core, "MANAGED_COPY_THRESHOLD", 5 * 2 ** 20) + test_file1 = test_bucket_name + "/test/multipart-upload.txt" test_file2 = test_bucket_name + "/test/multipart-upload-copy.txt" From ff515dd09b049edba50f65308bf71d178531b709 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 16 Mar 2021 23:24:38 +0300 Subject: [PATCH 4/8] separate gather logic --- s3fs/core.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index d980efec..697e3787 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -1322,13 +1322,18 @@ async def _copy_etag_preserved(self, path1, path2, size, total_parts, **kwargs): mpu = await self._call_s3( self.s3.create_multipart_upload, Bucket=bucket2, Key=key2, **kwargs ) + part_infos = await asyncio.gather(*[ + self._call_s3( + self.s3.head_object, + Bucket=bucket1, + Key=key1, + PartNumber=i + ) for i in range(1, total_parts + 1) + ]) parts = [] brange_first = 0 - for i in range(1, total_parts + 1): - part_info = await self._call_s3( - self.s3.head_object, Bucket=bucket1, Key=key1, PartNumber=i - ) + for i, part_info in enumerate(part_infos, 1): part_size = part_info["ContentLength"] brange_last = brange_first + part_size - 1 if brange_last > size: From 64b3ef516684866bf09f1fdbdffa6c47eb3a78d9 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 16 Mar 2021 23:24:49 +0300 Subject: [PATCH 5/8] some cosmetic changes --- s3fs/core.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 697e3787..5c4e8e2f 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -1322,14 +1322,14 @@ async def _copy_etag_preserved(self, path1, path2, size, total_parts, **kwargs): mpu = await self._call_s3( self.s3.create_multipart_upload, Bucket=bucket2, Key=key2, **kwargs ) - part_infos = await asyncio.gather(*[ - self._call_s3( - self.s3.head_object, - Bucket=bucket1, - Key=key1, - PartNumber=i - ) for i in range(1, total_parts + 1) - ]) + part_infos = await asyncio.gather( + *[ + self._call_s3( + self.s3.head_object, Bucket=bucket1, Key=key1, PartNumber=i + ) + for i in range(1, total_parts + 1) + ] + ) parts = [] brange_first = 0 From 23bd758258c0e49a223160b98d852303cb2620ff Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 17 Mar 2021 00:02:51 +0300 Subject: [PATCH 6/8] add basic docs --- s3fs/core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/s3fs/core.py b/s3fs/core.py index 5c4e8e2f..909d200c 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -1401,6 +1401,7 @@ async def _copy_managed(self, path1, path2, size, block=5 * 2 ** 30, **kwargs): self.invalidate_cache(path2) async def _cp_file(self, path1, path2, preserve_etag=None, **kwargs): + """Copy file between locations on S3.""" path1 = self._strip_protocol(path1) bucket, key, vers = self.split_path(path1) From 2052fbb02f2b11a9300b777d1cc48b5f833680a7 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 17 Mar 2021 00:12:06 +0300 Subject: [PATCH 7/8] add docs --- s3fs/core.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 909d200c..b4a61a7b 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -1401,23 +1401,35 @@ async def _copy_managed(self, path1, path2, size, block=5 * 2 ** 30, **kwargs): self.invalidate_cache(path2) async def _cp_file(self, path1, path2, preserve_etag=None, **kwargs): - """Copy file between locations on S3.""" + """Copy file between locations on S3. + + preserve_etag: bool + Whether to preserve etag while copying. If the file is uploaded + as a single part, then it will be always equalivent to the md5 + hash of the file hence etag will always be preserved. But if the + file is uploaded in multi parts, then this option will try to + reproduce the same multipart upload while copying and preserve + the generated etag. + """ path1 = self._strip_protocol(path1) bucket, key, vers = self.split_path(path1) info = await self._info(path1, bucket, key, version_id=vers) size = info["size"] - if size <= MANAGED_COPY_THRESHOLD: - # simple copy allowed for <5GB - await self._copy_basic(path1, path2, **kwargs) - elif preserve_etag: - # etag preserving multipart copy - _, _, parts_suffix = info["ETag"].strip('"').partition("-") - assert parts_suffix + + _, _, parts_suffix = info["ETag"].strip('"').partition("-") + if preserve_etag and parts_suffix: await self._copy_etag_preserved( path1, path2, size, total_parts=int(parts_suffix) ) + elif size <= MANAGED_COPY_THRESHOLD: + # simple copy allowed for <5GB + await self._copy_basic(path1, path2, **kwargs) else: + # if the preserve_etag is true, either the file is uploaded + # on multiple parts or the size is lower than 5GB + assert not preserve_etag + # serial multipart copy await self._copy_managed(path1, path2, size, **kwargs) From a33c513d1d28229c2f1efe6f7dac1e99b9586d6a Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 17 Mar 2021 00:12:16 +0300 Subject: [PATCH 8/8] typo --- s3fs/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3fs/core.py b/s3fs/core.py index b4a61a7b..c60b66e7 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -1402,7 +1402,7 @@ async def _copy_managed(self, path1, path2, size, block=5 * 2 ** 30, **kwargs): async def _cp_file(self, path1, path2, preserve_etag=None, **kwargs): """Copy file between locations on S3. - + preserve_etag: bool Whether to preserve etag while copying. If the file is uploaded as a single part, then it will be always equalivent to the md5