From 271ad7262fdf1883fcbe71d02bfcdcb37c67cfe3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Saugat=20Pachhai=20=28=E0=A4=B8=E0=A5=8C=E0=A4=97=E0=A4=BE?= =?UTF-8?q?=E0=A4=A4=29?= Date: Tue, 31 May 2022 14:16:34 +0545 Subject: [PATCH 1/5] test --- src/dvc_objects/fs/utils.py | 42 ++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/src/dvc_objects/fs/utils.py b/src/dvc_objects/fs/utils.py index 1d30ad1..9dd9643 100644 --- a/src/dvc_objects/fs/utils.py +++ b/src/dvc_objects/fs/utils.py @@ -143,6 +143,7 @@ def copyfile( callback: "Callback" = None, no_progress_bar: bool = False, name: str = None, + reflink: bool = False, ) -> None: """Copy file with progress bar""" name = name if name else os.path.basename(dest) @@ -154,25 +155,28 @@ def copyfile( if callback: callback.set_size(total) - try: - system.reflink(src, dest) - except OSError: - from .callbacks import Callback - - with open(src, "rb") as fsrc, open(dest, "wb+") as fdest: - with Callback.as_tqdm_callback( - callback, - size=total, - bytes=True, - disable=no_progress_bar, - desc=name, - ) as cb: - wrapped = cb.wrap_attr(fdest, "write") - while True: - buf = fsrc.read(LOCAL_CHUNK_SIZE) - if not buf: - break - wrapped.write(buf) + if reflink: + try: + return system.reflink(src, dest) + except OSError: + pass + + from .callbacks import Callback + + with open(src, "rb") as fsrc, open(dest, "wb+") as fdest: + with Callback.as_tqdm_callback( + callback, + size=total, + bytes=True, + disable=no_progress_bar, + desc=name, + ) as cb: + wrapped = cb.wrap_attr(fdest, "write") + while True: + buf = fsrc.read(LOCAL_CHUNK_SIZE) + if not buf: + break + wrapped.write(buf) if callback: callback.absolute_update(total) From e035a507c22da804e5c2698ccdffd1855edcf655 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Saugat=20Pachhai=20=28=E0=A4=B8=E0=A5=8C=E0=A4=97=E0=A4=BE?= =?UTF-8?q?=E0=A4=A4=29?= Date: Tue, 31 May 2022 19:23:39 +0545 Subject: [PATCH 2/5] temp --- src/dvc_objects/fs/fastcopy.py | 76 ++++++++++++++++++++++++++++++++++ src/dvc_objects/fs/utils.py | 29 ++++--------- 2 files changed, 84 insertions(+), 21 deletions(-) create mode 100644 src/dvc_objects/fs/fastcopy.py diff --git a/src/dvc_objects/fs/fastcopy.py b/src/dvc_objects/fs/fastcopy.py new file mode 100644 index 0000000..e02832c --- /dev/null +++ b/src/dvc_objects/fs/fastcopy.py @@ -0,0 +1,76 @@ +import os +import posix +import sys +from shutil import _fastcopy_fcopyfile # type: ignore[attr-defined] +from shutil import _fastcopy_sendfile # type: ignore[attr-defined] +from shutil import _GiveupOnFastCopy # type: ignore[attr-defined] +from shutil import copyfileobj + +_WINDOWS = os.name == "nt" +COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 2**20 +_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") +_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS + + +def _copyfileobj_readinto(fsrc, fdst, callback, length=COPY_BUFSIZE): + """readinto()/memoryview() based variant of copyfileobj(). + *fsrc* must support readinto() method and both files must be + open in binary mode. + """ + # Localize variable access to minimize overhead. + fsrc_readinto = fsrc.readinto + fdst_write = fdst.write + with memoryview(bytearray(length)) as mv: + while n := fsrc_readinto(mv): + callback.relative_update(n) + if n >= length: + fdst_write(mv) + continue + + with mv[:n] as smv: + fdst.write(smv) + + +def _copyfileobj(fsrc, fdst, callback, length=COPY_BUFSIZE): + file_size = os.fstat(fsrc.fileno()).st_size + callback.set_size(file_size) + + if _WINDOWS and file_size > 0: + # Windows, see: + # https://github.com/python/cpython/pull/7160#discussion_r195405230 + return _copyfileobj_readinto( + fsrc, fdst, callback, min(file_size, length) + ) + + wrapped = callback.wrap_attr(fsrc) + copyfileobj(wrapped, fdst, length=length) + + +def _copyfile(fsrc, fdst, callback): + if _HAS_FCOPYFILE: # macOS + try: + return _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) + except _GiveupOnFastCopy: + pass + + if _USE_CP_SENDFILE: + try: + return _fastcopy_sendfile(fsrc, fdst) + except _GiveupOnFastCopy: + pass + + return _copyfileobj(fsrc, fdst, callback) + + +def copyfile(src, dst, *, callback=None): + from .callbacks import Callback + + with open(src, "rb") as fsrc: + try: + with open(dst, "wb") as fdst, Callback.as_callback(callback) as cb: + _copyfile(fsrc, fdst, cb) + return dst + except IsADirectoryError as e: + if os.path.exists(dst): + raise + raise FileNotFoundError(f"Directory does not exist: {dst}") from e diff --git a/src/dvc_objects/fs/utils.py b/src/dvc_objects/fs/utils.py index 9dd9643..6c9a2ce 100644 --- a/src/dvc_objects/fs/utils.py +++ b/src/dvc_objects/fs/utils.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Iterator from . import system +from .fastcopy import copyfile as fast_copyfile if TYPE_CHECKING: from .base import AnyFSPath, FileSystem @@ -147,14 +148,10 @@ def copyfile( ) -> None: """Copy file with progress bar""" name = name if name else os.path.basename(dest) - total = os.stat(src).st_size if os.path.isdir(dest): dest = os.path.join(dest, os.path.basename(src)) - if callback: - callback.set_size(total) - if reflink: try: return system.reflink(src, dest) @@ -163,23 +160,13 @@ def copyfile( from .callbacks import Callback - with open(src, "rb") as fsrc, open(dest, "wb+") as fdest: - with Callback.as_tqdm_callback( - callback, - size=total, - bytes=True, - disable=no_progress_bar, - desc=name, - ) as cb: - wrapped = cb.wrap_attr(fdest, "write") - while True: - buf = fsrc.read(LOCAL_CHUNK_SIZE) - if not buf: - break - wrapped.write(buf) - - if callback: - callback.absolute_update(total) + with Callback.as_tqdm_callback( + callback, + bytes=True, + disable=no_progress_bar, + desc=name, + ) as cb: + fast_copyfile(src, dest, callback=cb) def tmp_fname(fname: "AnyFSPath" = "") -> "AnyFSPath": From 43d78e19ed8e2601bab1b15f9df62be4f97806f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Saugat=20Pachhai=20=28=E0=A4=B8=E0=A5=8C=E0=A4=97=E0=A4=BE?= =?UTF-8?q?=E0=A4=A4=29?= Date: Mon, 6 Jun 2022 09:55:07 +0545 Subject: [PATCH 3/5] benchmark and copyfile with copy_file_range --- setup.cfg | 1 + src/dvc_objects/fs/fastcopy.py | 106 +++++++++++++++++++++++---- tests/benchmarks/__init__.py | 0 tests/benchmarks/test_copy.py | 127 +++++++++++++++++++++++++++++++++ 4 files changed, 220 insertions(+), 14 deletions(-) create mode 100644 tests/benchmarks/__init__.py create mode 100644 tests/benchmarks/test_copy.py diff --git a/setup.cfg b/setup.cfg index cf5e360..c777958 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,6 +35,7 @@ install_requires= [options.extras_require] tests = pytest==7.1.2 + pytest-benchmark==3.4.1 pytest-sugar==0.9.4 pytest-cov==3.0.0 pytest-mock==3.7.0 diff --git a/src/dvc_objects/fs/fastcopy.py b/src/dvc_objects/fs/fastcopy.py index e02832c..be4070b 100644 --- a/src/dvc_objects/fs/fastcopy.py +++ b/src/dvc_objects/fs/fastcopy.py @@ -1,18 +1,84 @@ +import errno import os import posix import sys -from shutil import _fastcopy_fcopyfile # type: ignore[attr-defined] -from shutil import _fastcopy_sendfile # type: ignore[attr-defined] -from shutil import _GiveupOnFastCopy # type: ignore[attr-defined] -from shutil import copyfileobj +from shutil import _fastcopy_fcopyfile as _fcopyfile # type: ignore +from shutil import _fastcopy_sendfile as _sendfile # type: ignore +from shutil import _GiveupOnFastCopy # type: ignore +from shutil import copyfileobj as _copyfileobj_shutil _WINDOWS = os.name == "nt" COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 2**20 _USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") _HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS +_USE_CP_COPY_FILE_RANGE = hasattr(os, "copy_file_range") -def _copyfileobj_readinto(fsrc, fdst, callback, length=COPY_BUFSIZE): +def _determine_linux_fastcopy_blocksize(infd): + """Determine blocksize for fastcopying on Linux. + Hopefully the whole file will be copied in a single call. + The copying itself should be performed in a loop 'till EOF is + reached (0 return) so a blocksize smaller or bigger than the actual + file size should not make any difference, also in case the file + content changes while being copied. + """ + try: + blocksize = max(os.fstat(infd).st_size, 2**23) # min 8 MiB + except OSError: + blocksize = 2**27 # 128 MiB + # On 32-bit architectures truncate to 1 GiB to avoid OverflowError, + # see gh-82500. + if sys.maxsize < 2**32: + blocksize = min(blocksize, 2**30) + return blocksize + + +def _copy_file_range(fsrc, fdst): + """Copy data from one regular mmap-like fd to another by using + a high-performance copy_file_range(2) syscall that gives filesystems + an opportunity to implement the use of reflinks or server-side copy. + This should work on Linux >= 4.5 only. + + See https://github.com/python/cpython/pull/93152. + """ + try: + infd = fsrc.fileno() + outfd = fdst.fileno() + except Exception as err: + raise _GiveupOnFastCopy(err) # not a regular file + + blocksize = _determine_linux_fastcopy_blocksize(infd) + offset = 0 + while True: + try: + n_copied = os.copy_file_range( + infd, outfd, blocksize, offset_dst=offset + ) + except OSError as err: + # ...in order to have a more informative exception. + err.filename = fsrc.name + err.filename2 = fdst.name + + if err.errno == errno.ENOSPC: # filesystem is full + raise err from None + + # Give up on first call and if no data was copied. + if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: + raise _GiveupOnFastCopy(err) + + raise err + else: + if n_copied == 0: + # If no bytes have been copied yet, copy_file_range + # might silently fail. + # https://lore.kernel.org/linux-fsdevel/20210126233840.GG4626@dread.disaster.area/T/#m05753578c7f7882f6e9ffe01f981bc223edef2b0 + if offset == 0: + raise _GiveupOnFastCopy() + break + offset += n_copied + + +def _copyfileobj_readinto(fsrc, fdst, callback=None, length=COPY_BUFSIZE): """readinto()/memoryview() based variant of copyfileobj(). *fsrc* must support readinto() method and both files must be open in binary mode. @@ -22,7 +88,8 @@ def _copyfileobj_readinto(fsrc, fdst, callback, length=COPY_BUFSIZE): fdst_write = fdst.write with memoryview(bytearray(length)) as mv: while n := fsrc_readinto(mv): - callback.relative_update(n) + if callback: + callback.relative_update(n) if n >= length: fdst_write(mv) continue @@ -31,9 +98,10 @@ def _copyfileobj_readinto(fsrc, fdst, callback, length=COPY_BUFSIZE): fdst.write(smv) -def _copyfileobj(fsrc, fdst, callback, length=COPY_BUFSIZE): +def _copyfileobj(fsrc, fdst, callback=None, length=COPY_BUFSIZE): file_size = os.fstat(fsrc.fileno()).st_size - callback.set_size(file_size) + if callback: + callback.set_size(file_size) if _WINDOWS and file_size > 0: # Windows, see: @@ -42,33 +110,43 @@ def _copyfileobj(fsrc, fdst, callback, length=COPY_BUFSIZE): fsrc, fdst, callback, min(file_size, length) ) - wrapped = callback.wrap_attr(fsrc) - copyfileobj(wrapped, fdst, length=length) + wrapped = callback.wrap_attr(fsrc) if callback else fsrc + _copyfileobj_shutil(wrapped, fdst, length=length) def _copyfile(fsrc, fdst, callback): if _HAS_FCOPYFILE: # macOS try: - return _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) + return _fcopyfile( + fsrc, + fdst, + posix._COPYFILE_DATA, # pylint: disable=protected-access + ) + except _GiveupOnFastCopy: + pass + + if _USE_CP_COPY_FILE_RANGE: + try: + return _copy_file_range(fsrc, fdst) except _GiveupOnFastCopy: pass if _USE_CP_SENDFILE: try: - return _fastcopy_sendfile(fsrc, fdst) + return _sendfile(fsrc, fdst) except _GiveupOnFastCopy: pass return _copyfileobj(fsrc, fdst, callback) -def copyfile(src, dst, *, callback=None): +def copyfile(src, dst, *, callback=None, copy_function=_copyfile): from .callbacks import Callback with open(src, "rb") as fsrc: try: with open(dst, "wb") as fdst, Callback.as_callback(callback) as cb: - _copyfile(fsrc, fdst, cb) + copy_function(fsrc, fdst, cb) return dst except IsADirectoryError as e: if os.path.exists(dst): diff --git a/tests/benchmarks/__init__.py b/tests/benchmarks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/benchmarks/test_copy.py b/tests/benchmarks/test_copy.py new file mode 100644 index 0000000..bd28982 --- /dev/null +++ b/tests/benchmarks/test_copy.py @@ -0,0 +1,127 @@ +import os +import posix +import shutil +from functools import partial +from pathlib import Path + +import pytest + +from dvc_objects.fs import fastcopy, system +from dvc_objects.fs.utils import human_readable_to_bytes, remove + +TEST_DIR = Path(__file__).resolve().parents[1] / "copy-test-dir" +FILE_SIZES = [ + "1KB", + "10KB", + "100KB", + "1MB", + "10MB", + "100MB", + "1GB", + # "2GB", + # "5GB", + # "10GB", +] + + +def write_random_data( + path: Path, size: int, chunk_size: int = 1024 * 1024 * 1024 +) -> None: + TEST_DIR.mkdir(exist_ok=True, parents=True) + try: + with path.open("wb") as fobj: + while size > chunk_size: + fobj.write(os.urandom(chunk_size)) + size -= chunk_size + fobj.write(os.urandom(size)) + except: # noqa: E722, B001 + remove(os.fspath(path)) + raise + + +def copyfile_length(src, dst, length=0): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + shutil.copyfileobj(fsrc, fdst, length=0) + + +def copyfile_read(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + fastcopy._copyfileobj(fsrc, fdst) + + +def copyfile_readinto(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + file_size = os.fstat(fsrc.fileno()).st_size + return fastcopy._copyfileobj_readinto( + fsrc, fdst, length=min(file_size, fastcopy.COPY_BUFSIZE) + ) + + +def copyfile_range(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + fastcopy._copy_file_range(fsrc, fdst) + + +def copyfile_reflink(src, dst): + return system.reflink(src, dst) + + +def copyfile_sendfile(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + fastcopy._sendfile(fsrc, fdst) + + +def copyfile_fcopyfile(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + fastcopy._fcopyfile(fsrc, fdst, posix._COPY_FILE_DATA) + + +COPY_FUNCTIONS = { + "fastcopy": fastcopy.copyfile, # platform-specific copy + "shutil_copy": shutil.copyfile, + "read": copyfile_read, # read-write + "readinto": copyfile_readinto, + "64k": partial(copyfile_length, length=64 * 1024), + # "128k": partial(copyfile_length, length=128 * 1024), + # "256k": partial(copyfile_length, length=256 * 1024), + # "512k": partial(copyfile_length, length=512 * 1024), + # "1M": partial(copyfile_length, length=1024 * 1024), + # "4M": partial(copyfile_length, length=4 * 1024 * 1024), + # "10M": partial(copyfile_length, length=10 * 1024 * 1024), + # "100M": partial(copyfile_length, length=100 * 1024 * 1024), +} + +if fastcopy._HAS_FCOPYFILE: + COPY_FUNCTIONS["fcopyfile"] = copyfile_fcopyfile + +if fastcopy._USE_CP_COPY_FILE_RANGE: + COPY_FUNCTIONS["copy_file_range"] = copyfile_range + +if fastcopy._USE_CP_SENDFILE: + COPY_FUNCTIONS["sendfile"] = copyfile_sendfile + +COPY_FUNCTIONS["reflink"] = pytest.param( + copyfile_reflink, marks=pytest.mark.xfail(raises=OSError) +) + + +@pytest.mark.parametrize("hsize", FILE_SIZES) +@pytest.mark.parametrize( + "copy_function", COPY_FUNCTIONS.values(), ids=COPY_FUNCTIONS.keys() +) +def test_sendfile(request, benchmark, copy_function, hsize): + src = TEST_DIR / f"orig-{hsize}" + dst = TEST_DIR / f"dup-{hsize}" + if not src.exists(): + write_random_data(src, human_readable_to_bytes(hsize)) + request.addfinalizer(partial(remove, os.fspath(dst))) + + benchmark(copy_function, src, dst) + assert dst.stat().st_size == src.stat().st_size + + +if __name__ == "__main__": + for hsize in FILE_SIZES: + size = human_readable_to_bytes(hsize) + write_random_data(TEST_DIR / f"orig-{hsize}", size) + print(hsize) From f3756d16dfe628669b9441ed537453d11203e096 Mon Sep 17 00:00:00 2001 From: skshetry Date: Mon, 6 Jun 2022 14:40:11 +0545 Subject: [PATCH 4/5] Update tests.yml --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 66dc514..9005f9e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -42,7 +42,7 @@ jobs: uses: actions/cache@v3 with: path: .nox - key: ${{ runner.os }}-${{ matrix.pyv }}-${{ hashFiles('noxfile.py') }}-nox + key: ${{ runner.os }}-${{ matrix.pyv }}-${{ hashFiles('noxfile.py') }}-nox-2 - name: cache pre-commit hook uses: actions/cache@v3 From ef91fd6db9aba2d23ac853ea2f57de36295310ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Saugat=20Pachhai=20=28=E0=A4=B8=E0=A5=8C=E0=A4=97=E0=A4=BE?= =?UTF-8?q?=E0=A4=A4=29?= Date: Mon, 6 Jun 2022 17:54:06 +0545 Subject: [PATCH 5/5] fix lint --- src/dvc_objects/fs/fastcopy.py | 12 +++++++++--- tests/benchmarks/test_copy.py | 11 ++++++++--- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/dvc_objects/fs/fastcopy.py b/src/dvc_objects/fs/fastcopy.py index be4070b..021fe05 100644 --- a/src/dvc_objects/fs/fastcopy.py +++ b/src/dvc_objects/fs/fastcopy.py @@ -1,12 +1,17 @@ import errno import os -import posix import sys from shutil import _fastcopy_fcopyfile as _fcopyfile # type: ignore from shutil import _fastcopy_sendfile as _sendfile # type: ignore from shutil import _GiveupOnFastCopy # type: ignore from shutil import copyfileobj as _copyfileobj_shutil +try: + import posix # type:ignore +except ImportError: + posix = None # type: ignore + + _WINDOWS = os.name == "nt" COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 2**20 _USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") @@ -51,7 +56,7 @@ def _copy_file_range(fsrc, fdst): offset = 0 while True: try: - n_copied = os.copy_file_range( + n_copied = os.copy_file_range( # pylint: disable=no-member infd, outfd, blocksize, offset_dst=offset ) except OSError as err: @@ -117,10 +122,11 @@ def _copyfileobj(fsrc, fdst, callback=None, length=COPY_BUFSIZE): def _copyfile(fsrc, fdst, callback): if _HAS_FCOPYFILE: # macOS try: + # pylint: disable=protected-access, no-member return _fcopyfile( fsrc, fdst, - posix._COPYFILE_DATA, # pylint: disable=protected-access + posix._COPYFILE_DATA, ) except _GiveupOnFastCopy: pass diff --git a/tests/benchmarks/test_copy.py b/tests/benchmarks/test_copy.py index bd28982..a4fd300 100644 --- a/tests/benchmarks/test_copy.py +++ b/tests/benchmarks/test_copy.py @@ -1,5 +1,4 @@ import os -import posix import shutil from functools import partial from pathlib import Path @@ -9,6 +8,12 @@ from dvc_objects.fs import fastcopy, system from dvc_objects.fs.utils import human_readable_to_bytes, remove +try: + import posix # type: ignore +except ImportError: + posix = None # type: ignore + + TEST_DIR = Path(__file__).resolve().parents[1] / "copy-test-dir" FILE_SIZES = [ "1KB", @@ -73,7 +78,7 @@ def copyfile_sendfile(src, dst): def copyfile_fcopyfile(src, dst): with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: - fastcopy._fcopyfile(fsrc, fdst, posix._COPY_FILE_DATA) + fastcopy._fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) COPY_FUNCTIONS = { @@ -91,7 +96,7 @@ def copyfile_fcopyfile(src, dst): # "100M": partial(copyfile_length, length=100 * 1024 * 1024), } -if fastcopy._HAS_FCOPYFILE: +if posix and fastcopy._HAS_FCOPYFILE: COPY_FUNCTIONS["fcopyfile"] = copyfile_fcopyfile if fastcopy._USE_CP_COPY_FILE_RANGE: