diff --git a/setup.cfg b/setup.cfg index cf5e360..c777958 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,6 +35,7 @@ install_requires= [options.extras_require] tests = pytest==7.1.2 + pytest-benchmark==3.4.1 pytest-sugar==0.9.4 pytest-cov==3.0.0 pytest-mock==3.7.0 diff --git a/src/dvc_objects/fs/fastcopy.py b/src/dvc_objects/fs/fastcopy.py new file mode 100644 index 0000000..021fe05 --- /dev/null +++ b/src/dvc_objects/fs/fastcopy.py @@ -0,0 +1,160 @@ +import errno +import os +import sys +from shutil import _fastcopy_fcopyfile as _fcopyfile # type: ignore +from shutil import _fastcopy_sendfile as _sendfile # type: ignore +from shutil import _GiveupOnFastCopy # type: ignore +from shutil import copyfileobj as _copyfileobj_shutil + +try: + import posix # type:ignore +except ImportError: + posix = None # type: ignore + + +_WINDOWS = os.name == "nt" +COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 2**20 +_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") +_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS +_USE_CP_COPY_FILE_RANGE = hasattr(os, "copy_file_range") + + +def _determine_linux_fastcopy_blocksize(infd): + """Determine blocksize for fastcopying on Linux. + Hopefully the whole file will be copied in a single call. + The copying itself should be performed in a loop 'till EOF is + reached (0 return) so a blocksize smaller or bigger than the actual + file size should not make any difference, also in case the file + content changes while being copied. + """ + try: + blocksize = max(os.fstat(infd).st_size, 2**23) # min 8 MiB + except OSError: + blocksize = 2**27 # 128 MiB + # On 32-bit architectures truncate to 1 GiB to avoid OverflowError, + # see gh-82500. + if sys.maxsize < 2**32: + blocksize = min(blocksize, 2**30) + return blocksize + + +def _copy_file_range(fsrc, fdst): + """Copy data from one regular mmap-like fd to another by using + a high-performance copy_file_range(2) syscall that gives filesystems + an opportunity to implement the use of reflinks or server-side copy. + This should work on Linux >= 4.5 only. + + See https://github.com/python/cpython/pull/93152. + """ + try: + infd = fsrc.fileno() + outfd = fdst.fileno() + except Exception as err: + raise _GiveupOnFastCopy(err) # not a regular file + + blocksize = _determine_linux_fastcopy_blocksize(infd) + offset = 0 + while True: + try: + n_copied = os.copy_file_range( # pylint: disable=no-member + infd, outfd, blocksize, offset_dst=offset + ) + except OSError as err: + # ...in order to have a more informative exception. + err.filename = fsrc.name + err.filename2 = fdst.name + + if err.errno == errno.ENOSPC: # filesystem is full + raise err from None + + # Give up on first call and if no data was copied. + if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: + raise _GiveupOnFastCopy(err) + + raise err + else: + if n_copied == 0: + # If no bytes have been copied yet, copy_file_range + # might silently fail. + # https://lore.kernel.org/linux-fsdevel/20210126233840.GG4626@dread.disaster.area/T/#m05753578c7f7882f6e9ffe01f981bc223edef2b0 + if offset == 0: + raise _GiveupOnFastCopy() + break + offset += n_copied + + +def _copyfileobj_readinto(fsrc, fdst, callback=None, length=COPY_BUFSIZE): + """readinto()/memoryview() based variant of copyfileobj(). + *fsrc* must support readinto() method and both files must be + open in binary mode. + """ + # Localize variable access to minimize overhead. + fsrc_readinto = fsrc.readinto + fdst_write = fdst.write + with memoryview(bytearray(length)) as mv: + while n := fsrc_readinto(mv): + if callback: + callback.relative_update(n) + if n >= length: + fdst_write(mv) + continue + + with mv[:n] as smv: + fdst.write(smv) + + +def _copyfileobj(fsrc, fdst, callback=None, length=COPY_BUFSIZE): + file_size = os.fstat(fsrc.fileno()).st_size + if callback: + callback.set_size(file_size) + + if _WINDOWS and file_size > 0: + # Windows, see: + # https://github.com/python/cpython/pull/7160#discussion_r195405230 + return _copyfileobj_readinto( + fsrc, fdst, callback, min(file_size, length) + ) + + wrapped = callback.wrap_attr(fsrc) if callback else fsrc + _copyfileobj_shutil(wrapped, fdst, length=length) + + +def _copyfile(fsrc, fdst, callback): + if _HAS_FCOPYFILE: # macOS + try: + # pylint: disable=protected-access, no-member + return _fcopyfile( + fsrc, + fdst, + posix._COPYFILE_DATA, + ) + except _GiveupOnFastCopy: + pass + + if _USE_CP_COPY_FILE_RANGE: + try: + return _copy_file_range(fsrc, fdst) + except _GiveupOnFastCopy: + pass + + if _USE_CP_SENDFILE: + try: + return _sendfile(fsrc, fdst) + except _GiveupOnFastCopy: + pass + + return _copyfileobj(fsrc, fdst, callback) + + +def copyfile(src, dst, *, callback=None, copy_function=_copyfile): + from .callbacks import Callback + + with open(src, "rb") as fsrc: + try: + with open(dst, "wb") as fdst, Callback.as_callback(callback) as cb: + copy_function(fsrc, fdst, cb) + return dst + except IsADirectoryError as e: + if os.path.exists(dst): + raise + raise FileNotFoundError(f"Directory does not exist: {dst}") from e diff --git a/src/dvc_objects/fs/utils.py b/src/dvc_objects/fs/utils.py index 1d30ad1..6c9a2ce 100644 --- a/src/dvc_objects/fs/utils.py +++ b/src/dvc_objects/fs/utils.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Iterator from . import system +from .fastcopy import copyfile as fast_copyfile if TYPE_CHECKING: from .base import AnyFSPath, FileSystem @@ -143,39 +144,29 @@ def copyfile( callback: "Callback" = None, no_progress_bar: bool = False, name: str = None, + reflink: bool = False, ) -> None: """Copy file with progress bar""" name = name if name else os.path.basename(dest) - total = os.stat(src).st_size if os.path.isdir(dest): dest = os.path.join(dest, os.path.basename(src)) - if callback: - callback.set_size(total) + if reflink: + try: + return system.reflink(src, dest) + except OSError: + pass - try: - system.reflink(src, dest) - except OSError: - from .callbacks import Callback - - with open(src, "rb") as fsrc, open(dest, "wb+") as fdest: - with Callback.as_tqdm_callback( - callback, - size=total, - bytes=True, - disable=no_progress_bar, - desc=name, - ) as cb: - wrapped = cb.wrap_attr(fdest, "write") - while True: - buf = fsrc.read(LOCAL_CHUNK_SIZE) - if not buf: - break - wrapped.write(buf) - - if callback: - callback.absolute_update(total) + from .callbacks import Callback + + with Callback.as_tqdm_callback( + callback, + bytes=True, + disable=no_progress_bar, + desc=name, + ) as cb: + fast_copyfile(src, dest, callback=cb) def tmp_fname(fname: "AnyFSPath" = "") -> "AnyFSPath": diff --git a/tests/benchmarks/__init__.py b/tests/benchmarks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/benchmarks/test_copy.py b/tests/benchmarks/test_copy.py new file mode 100644 index 0000000..a4fd300 --- /dev/null +++ b/tests/benchmarks/test_copy.py @@ -0,0 +1,132 @@ +import os +import shutil +from functools import partial +from pathlib import Path + +import pytest + +from dvc_objects.fs import fastcopy, system +from dvc_objects.fs.utils import human_readable_to_bytes, remove + +try: + import posix # type: ignore +except ImportError: + posix = None # type: ignore + + +TEST_DIR = Path(__file__).resolve().parents[1] / "copy-test-dir" +FILE_SIZES = [ + "1KB", + "10KB", + "100KB", + "1MB", + "10MB", + "100MB", + "1GB", + # "2GB", + # "5GB", + # "10GB", +] + + +def write_random_data( + path: Path, size: int, chunk_size: int = 1024 * 1024 * 1024 +) -> None: + TEST_DIR.mkdir(exist_ok=True, parents=True) + try: + with path.open("wb") as fobj: + while size > chunk_size: + fobj.write(os.urandom(chunk_size)) + size -= chunk_size + fobj.write(os.urandom(size)) + except: # noqa: E722, B001 + remove(os.fspath(path)) + raise + + +def copyfile_length(src, dst, length=0): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + shutil.copyfileobj(fsrc, fdst, length=0) + + +def copyfile_read(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + fastcopy._copyfileobj(fsrc, fdst) + + +def copyfile_readinto(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + file_size = os.fstat(fsrc.fileno()).st_size + return fastcopy._copyfileobj_readinto( + fsrc, fdst, length=min(file_size, fastcopy.COPY_BUFSIZE) + ) + + +def copyfile_range(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + fastcopy._copy_file_range(fsrc, fdst) + + +def copyfile_reflink(src, dst): + return system.reflink(src, dst) + + +def copyfile_sendfile(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + fastcopy._sendfile(fsrc, fdst) + + +def copyfile_fcopyfile(src, dst): + with open(src, "rb") as fsrc, open(dst, "wb+") as fdst: + fastcopy._fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) + + +COPY_FUNCTIONS = { + "fastcopy": fastcopy.copyfile, # platform-specific copy + "shutil_copy": shutil.copyfile, + "read": copyfile_read, # read-write + "readinto": copyfile_readinto, + "64k": partial(copyfile_length, length=64 * 1024), + # "128k": partial(copyfile_length, length=128 * 1024), + # "256k": partial(copyfile_length, length=256 * 1024), + # "512k": partial(copyfile_length, length=512 * 1024), + # "1M": partial(copyfile_length, length=1024 * 1024), + # "4M": partial(copyfile_length, length=4 * 1024 * 1024), + # "10M": partial(copyfile_length, length=10 * 1024 * 1024), + # "100M": partial(copyfile_length, length=100 * 1024 * 1024), +} + +if posix and fastcopy._HAS_FCOPYFILE: + COPY_FUNCTIONS["fcopyfile"] = copyfile_fcopyfile + +if fastcopy._USE_CP_COPY_FILE_RANGE: + COPY_FUNCTIONS["copy_file_range"] = copyfile_range + +if fastcopy._USE_CP_SENDFILE: + COPY_FUNCTIONS["sendfile"] = copyfile_sendfile + +COPY_FUNCTIONS["reflink"] = pytest.param( + copyfile_reflink, marks=pytest.mark.xfail(raises=OSError) +) + + +@pytest.mark.parametrize("hsize", FILE_SIZES) +@pytest.mark.parametrize( + "copy_function", COPY_FUNCTIONS.values(), ids=COPY_FUNCTIONS.keys() +) +def test_sendfile(request, benchmark, copy_function, hsize): + src = TEST_DIR / f"orig-{hsize}" + dst = TEST_DIR / f"dup-{hsize}" + if not src.exists(): + write_random_data(src, human_readable_to_bytes(hsize)) + request.addfinalizer(partial(remove, os.fspath(dst))) + + benchmark(copy_function, src, dst) + assert dst.stat().st_size == src.stat().st_size + + +if __name__ == "__main__": + for hsize in FILE_SIZES: + size = human_readable_to_bytes(hsize) + write_random_data(TEST_DIR / f"orig-{hsize}", size) + print(hsize)