diff --git a/.flake8 b/.flake8 index b4f70a67..a52e6824 100644 --- a/.flake8 +++ b/.flake8 @@ -16,8 +16,6 @@ max-complexity = 15 select = B,C,E,F,W,T4,B902,T,P show_source = true count = true -per-file-ignores = - upath/__init__.py: F401 exclude = .noxfile, .nox, diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7f672d01..3dabe55f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -60,7 +60,12 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: '3.9' + python-version: | + 3.9 + 3.10 + 3.11 + 3.12 + 3.13 - name: Install nox run: python -m pip install --upgrade nox diff --git a/README.md b/README.md index 5acb4ff3..485f5354 100644 --- a/README.md +++ b/README.md @@ -645,15 +645,7 @@ installation of a newer version of its upstream dependencies. Below you can find a list of known issues and their solutions. We attempt to keep this list updated whenever we encounter more: -- **UPath().glob()**: - `fsspec` fixed glob behavior when handling `**` patterns in `fsspec>=2023.9.0` -- **GCSPath().mkdir()**: - a few mkdir quirks are solved by installing `gcsfs>=2022.7.1` -- **fsspec.filesystem(WebdavPath().protocol)** - the webdav protocol was added to fsspec in version `fsspec>=2022.5.0` -- **stat.S_ISDIR(HTTPPath().stat().st_mode)** - requires `fsspec>=2024.2.0` to correctly return `True` for directories - +- currently none :sparkles: ## Contributing diff --git a/noxfile.py b/noxfile.py index a61efb1b..0fbfd662 100644 --- a/noxfile.py +++ b/noxfile.py @@ -8,14 +8,15 @@ nox.options.reuse_existing_virtualenvs = True nox.options.sessions = "lint", "tests" locations = ("upath",) -hide_pip_install = os.environ.get("CI", "") == "" +running_in_ci = os.environ.get("CI", "") != "" @nox.session(python=["3.9", "3.10", "3.11", "3.12", "3.13"]) def tests(session: nox.Session) -> None: # workaround in case no aiohttp binary wheels are available session.env["AIOHTTP_NO_EXTENSIONS"] = "1" - session.install(".[tests,dev]", silent=hide_pip_install) + session.install(".[tests,dev]") + session.run("python", "-m", "pip", "freeze", silent=not running_in_ci) session.run( "pytest", "-m", @@ -29,7 +30,8 @@ def tests(session: nox.Session) -> None: @nox.session(python="3.9", name="tests-minversion") def tests_minversion(session: nox.Session) -> None: - session.install("fsspec==2022.1.0", ".[tests,dev]") + session.install("fsspec==2024.5.0", ".[tests,dev]") + session.run("python", "-m", "pip", "freeze", silent=not running_in_ci) session.run( "pytest", "-m", @@ -92,7 +94,7 @@ def type_checking(session): session.run("python", "-m", "mypy") -@nox.session +@nox.session(python=["3.9", "3.10", "3.11", "3.12", "3.13"]) def typesafety(session): session.install("-e", ".[tests]") session.run( diff --git a/pyproject.toml b/pyproject.toml index a2bc3032..aec91be3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ maintainers = [ ] requires-python = ">=3.9" dependencies = [ - "fsspec >=2022.1.0,!=2024.3.1", + "fsspec >=2024.5.0", "pathlib-abc ==0.4.3", ] classifiers = [ @@ -45,11 +45,11 @@ tests = [ "packaging", ] dev = [ - "adlfs", + "adlfs>=2024", "aiohttp", "requests", - "gcsfs>=2022.1.0", - "s3fs>=2022.1.0", + "gcsfs>=2024.5.0", + "s3fs>=2024.5.0", "moto[s3,server]", "webdav4[fsspec]", "paramiko", @@ -164,6 +164,10 @@ ignore_missing_imports = true module = "pathlib_abc.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "smbprotocol.*" +ignore_missing_imports = true + [tool.pylint.format] max-line-length = 88 diff --git a/typesafety/test_upath_interface.yml b/typesafety/test_upath_interface.yml index a41ce792..ba98c36f 100644 --- a/typesafety/test_upath_interface.yml +++ b/typesafety/test_upath_interface.yml @@ -64,7 +64,7 @@ from upath import UPath p = UPath("abc") - reveal_type(p.parts) # N: Revealed type is "builtins.tuple[builtins.str, ...]" + reveal_type(p.parts) # N: Revealed type is "typing.Sequence[builtins.str]" - case: upath_drive disable_cache: false @@ -294,14 +294,14 @@ main: | from upath import UPath - reveal_type(UPath("abc").glob("efg")) # N: Revealed type is "typing.Generator[upath.core.UPath, None, None]" + reveal_type(UPath("abc").glob("efg")) # N: Revealed type is "typing.Iterator[upath.core.UPath]" - case: upath_rglob disable_cache: false main: | from upath import UPath - reveal_type(UPath("abc").rglob("efg")) # N: Revealed type is "typing.Generator[upath.core.UPath, None, None]" + reveal_type(UPath("abc").rglob("efg")) # N: Revealed type is "typing.Iterator[upath.core.UPath]" - case: upath_is_dir disable_cache: false @@ -364,7 +364,7 @@ main: | from upath import UPath - reveal_type(UPath("abc").iterdir()) # N: Revealed type is "typing.Generator[upath.core.UPath, None, None]" + reveal_type(UPath("abc").iterdir()) # N: Revealed type is "typing.Iterator[upath.core.UPath]" - case: upath_lchmod disable_cache: false @@ -534,33 +534,15 @@ reveal_type(UPath("abc").write_text("efg")) # N: Revealed type is "builtins.int" -- case: upath_link_to_py39 +- case: upath_link_to disable_cache: false - mypy_config: python_version = 3.9 - main: | - from upath import UPath - - UPath("abc").link_to - -- case: upath_link_to_py312plus - disable_cache: false - mypy_config: python_version = 3.12 main: | from upath import UPath UPath("abc").link_to # E: "UPath" has no attribute "link_to" [attr-defined] -- case: upath_walk_py39 - disable_cache: false - mypy_config: python_version = 3.9 - main: | - from upath import UPath - - UPath("abc").walk # E: "UPath" has no attribute "walk" [attr-defined] - -- case: upath_walk_py312plus +- case: upath_walk disable_cache: false - mypy_config: python_version = 3.12 main: | from upath import UPath diff --git a/upath/__init__.py b/upath/__init__.py index 1cd4a44f..d08612a9 100644 --- a/upath/__init__.py +++ b/upath/__init__.py @@ -1,7 +1,5 @@ """Pathlib API extended to use fsspec backends.""" -import sys - try: from upath._version import __version__ except ImportError: diff --git a/upath/_compat.py b/upath/_compat.py deleted file mode 100644 index f4f7c038..00000000 --- a/upath/_compat.py +++ /dev/null @@ -1,397 +0,0 @@ -from __future__ import annotations - -import ntpath -import os -import posixpath -import sys -import warnings -from collections.abc import Sequence -from functools import wraps -from pathlib import Path -from pathlib import PurePath -from typing import Callable -from typing import TypeVar - -__all__ = [ - "PathlibPathShim", - "deprecated", -] - - -if sys.version_info >= (3, 12): # noqa: C901 - - class PathlibPathShim: - """no need to shim pathlib.Path in Python 3.12+""" - - __slots__ = () - __missing_py312_slots__ = () - - def __init__(self, *args): - super().__init__(*args) - -else: - - def _get_missing_py312_pathlib_slots(): - """Return a tuple of slots that are present in Python 3.12's - pathlib.Path but not in the current version of pathlib.Path - """ - py312_slots = ( - "_raw_paths", - "_drv", - "_root", - "_tail_cached", - "_str", - "_str_normcase_cached", - "_parts_normcase_cached", - "_lines_cached", - "_hash", - ) - current_slots = [ - slot for cls in Path.__mro__ for slot in getattr(cls, "__slots__", []) - ] - return tuple([slot for slot in py312_slots if slot not in current_slots]) - - class PathlibPathShim: - """A compatibility shim for python < 3.12 - - Basically vendoring the functionality of pathlib.Path from Python 3.12 - that's not overwritten in upath.core.UPath - - """ - - __slots__ = () - __missing_py312_slots__ = _get_missing_py312_pathlib_slots() - - def __init__(self, *args): - paths = [] - for arg in args: - if isinstance(arg, PurePath) and hasattr(arg, "_raw_paths"): - if arg._flavour is ntpath and self._flavour is posixpath: - # GH-103631: Convert separators for backwards compatibility. - paths.extend(path.replace("\\", "/") for path in arg._raw_paths) - else: - paths.extend(arg._raw_paths) - else: - try: - path = os.fspath(arg) - except TypeError: - path = arg - if not isinstance(path, str): - raise TypeError( - "argument should be a str or an os.PathLike " - "object where __fspath__ returns a str, " - f"not {type(path).__name__!r}" - ) - paths.append(path) - self._raw_paths = paths - - @classmethod - def _parse_path(cls, path): - if not path: - return "", "", [] - sep = cls._flavour.sep - altsep = cls._flavour.altsep - if altsep: - path = path.replace(altsep, sep) - drv, root, rel = cls._flavour.splitroot(path) - if not root and drv.startswith(sep) and not drv.endswith(sep): - drv_parts = drv.split(sep) - if len(drv_parts) == 4 and drv_parts[2] not in "?.": - # e.g. //server/share - root = sep - elif len(drv_parts) == 6: - # e.g. //?/unc/server/share - root = sep - parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."] - return drv, root, parsed - - def _load_parts(self): - paths = self._raw_paths - if len(paths) == 0: - path = "" - elif len(paths) == 1: - path = paths[0] - else: - path = self._flavour.join(*paths) - drv, root, tail = self._parse_path(path) - self._drv = drv - self._root = root - self._tail_cached = tail - - def _from_parsed_parts(self, drv, root, tail): - path_str = self._format_parsed_parts(drv, root, tail) - path = self.with_segments(path_str) - path._str = path_str or "." - path._drv = drv - path._root = root - path._tail_cached = tail - return path - - @classmethod - def _format_parsed_parts(cls, drv, root, tail): - if drv or root: - return drv + root + cls._flavour.sep.join(tail) - elif tail and cls._flavour.splitdrive(tail[0])[0]: - tail = ["."] + tail - return cls._flavour.sep.join(tail) - - def __str__(self): - try: - return self._str - except AttributeError: - self._str = ( - self._format_parsed_parts(self.drive, self.root, self._tail) or "." - ) - return self._str - - @property - def drive(self): - try: - return self._drv - except AttributeError: - self._load_parts() - return self._drv - - @property - def root(self): - try: - return self._root - except AttributeError: - self._load_parts() - return self._root - - @property - def _tail(self): - try: - return self._tail_cached - except AttributeError: - self._load_parts() - return self._tail_cached - - @property - def anchor(self): - anchor = self.drive + self.root - return anchor - - @property - def name(self): - tail = self._tail - if not tail: - return "" - return tail[-1] - - @property - def suffix(self): - name = self.name - i = name.rfind(".") - if 0 < i < len(name) - 1: - return name[i:] - else: - return "" - - @property - def suffixes(self): - name = self.name - if name.endswith("."): - return [] - name = name.lstrip(".") - return ["." + suffix for suffix in name.split(".")[1:]] - - @property - def stem(self): - name = self.name - i = name.rfind(".") - if 0 < i < len(name) - 1: - return name[:i] - else: - return name - - def with_name(self, name): - if not self.name: - raise ValueError(f"{self!r} has an empty name") - f = self._flavour - if ( - not name - or f.sep in name - or (f.altsep and f.altsep in name) - or name == "." - ): - raise ValueError("Invalid name %r" % (name)) - return self._from_parsed_parts( - self.drive, self.root, self._tail[:-1] + [name] - ) - - def with_stem(self, stem): - return self.with_name(stem + self.suffix) - - def with_suffix(self, suffix): - f = self._flavour - if f.sep in suffix or f.altsep and f.altsep in suffix: - raise ValueError(f"Invalid suffix {suffix!r}") - if suffix and not suffix.startswith(".") or suffix == ".": - raise ValueError("Invalid suffix %r" % (suffix)) - name = self.name - if not name: - raise ValueError(f"{self!r} has an empty name") - old_suffix = self.suffix - if not old_suffix: - name = name + suffix - else: - name = name[: -len(old_suffix)] + suffix - return self._from_parsed_parts( - self.drive, self.root, self._tail[:-1] + [name] - ) - - def relative_to(self, other, /, *_deprecated, walk_up=False): - if _deprecated: - msg = ( - "support for supplying more than one positional argument " - "to pathlib.PurePath.relative_to() is deprecated and " - "scheduled for removal in Python 3.14" - ) - warnings.warn( - f"pathlib.PurePath.relative_to(*args) {msg}", - DeprecationWarning, - stacklevel=2, - ) - other = self.with_segments(other, *_deprecated) - for step, path in enumerate([other] + list(other.parents)): # noqa: B007 - if self.is_relative_to(path): - break - elif not walk_up: - raise ValueError( - f"{str(self)!r} is not in the subpath of {str(other)!r}" - ) - elif path.name == "..": - raise ValueError(f"'..' segment in {str(other)!r} cannot be walked") - else: - raise ValueError( - f"{str(self)!r} and {str(other)!r} have different anchors" - ) - parts = [".."] * step + self._tail[len(path._tail) :] - return self.with_segments(*parts) - - def is_relative_to(self, other, /, *_deprecated): - if _deprecated: - msg = ( - "support for supplying more than one argument to " - "pathlib.PurePath.is_relative_to() is deprecated and " - "scheduled for removal in Python 3.14" - ) - warnings.warn( - f"pathlib.PurePath.is_relative_to(*args) {msg}", - DeprecationWarning, - stacklevel=2, - ) - other = self.with_segments(other, *_deprecated) - return other == self or other in self.parents - - @property - def parts(self): - if self.drive or self.root: - return (self.drive + self.root,) + tuple(self._tail) - else: - return tuple(self._tail) - - @property - def parent(self): - drv = self.drive - root = self.root - tail = self._tail - if not tail: - return self - return self._from_parsed_parts(drv, root, tail[:-1]) - - @property - def parents(self): - return _PathParents(self) - - def _make_child_relpath(self, name): - path_str = str(self) - tail = self._tail - if tail: - path_str = f"{path_str}{self._flavour.sep}{name}" - elif path_str != ".": - path_str = f"{path_str}{name}" - else: - path_str = name - path = self.with_segments(path_str) - path._str = path_str - path._drv = self.drive - path._root = self.root - path._tail_cached = tail + [name] - return path - - def lchmod(self, mode): - """ - Like chmod(), except if the path points to a symlink, the symlink's - permissions are changed, rather than its target's. - """ - self.chmod(mode, follow_symlinks=False) - - class _PathParents(Sequence): - __slots__ = ("_path", "_drv", "_root", "_tail") - - def __init__(self, path): - self._path = path - self._drv = path.drive - self._root = path.root - self._tail = path._tail - - def __len__(self): - return len(self._tail) - - def __getitem__(self, idx): - if isinstance(idx, slice): - return tuple(self[i] for i in range(*idx.indices(len(self)))) - - if idx >= len(self) or idx < -len(self): - raise IndexError(idx) - if idx < 0: - idx += len(self) - return self._path._from_parsed_parts( - self._drv, self._root, self._tail[: -idx - 1] - ) - - def __repr__(self): - return f"<{type(self._path).__name__}.parents>" - - -RT = TypeVar("RT") -F = Callable[..., RT] - - -def deprecated(*, python_version: tuple[int, ...]) -> Callable[[F], F]: - """marks function as deprecated""" - pyver_str = ".".join(map(str, python_version)) - - def deprecated_decorator(func: F) -> F: - if sys.version_info >= python_version: - - @wraps(func) - def wrapper(*args, **kwargs): - warnings.warn( - f"{func.__name__} is deprecated on py>={pyver_str}", - DeprecationWarning, - stacklevel=2, - ) - return func(*args, **kwargs) - - return wrapper - - else: - return func - - return deprecated_decorator - - -class method_and_classmethod: - """Allow a method to be used as both a method and a classmethod""" - - def __init__(self, method): - self.method = method - - def __get__(self, instance, owner): - if instance is None: - return self.method.__get__(owner) - return self.method.__get__(instance) diff --git a/upath/_flavour.py b/upath/_flavour.py index c3f2edd2..b0fa0366 100644 --- a/upath/_flavour.py +++ b/upath/_flavour.py @@ -5,7 +5,6 @@ import sys import warnings from collections.abc import Mapping -from collections.abc import Sequence from functools import lru_cache from typing import TYPE_CHECKING from typing import Any @@ -14,22 +13,23 @@ from urllib.parse import SplitResult from urllib.parse import urlsplit -if sys.version_info >= (3, 12): - from typing import TypeAlias -else: - TypeAlias = Any - from fsspec.registry import known_implementations from fsspec.registry import registry as _class_registry from fsspec.spec import AbstractFileSystem -from upath._compat import deprecated from upath._flavour_sources import FileSystemFlavourBase from upath._flavour_sources import flavour_registry from upath._protocol import get_upath_protocol from upath._protocol import normalize_empty_netloc +from upath.types import JoinablePath +from upath.types import UPathParser if TYPE_CHECKING: + if sys.version_info >= (3, 12): + from typing import TypeAlias + else: + TypeAlias = Any + from upath.core import UPath __all__ = [ @@ -37,10 +37,11 @@ "default_flavour", "upath_urijoin", "upath_get_kwargs_from_url", + "upath_strip_protocol", ] class_registry: Mapping[str, type[AbstractFileSystem]] = _class_registry -PathOrStr: TypeAlias = Union[str, "os.PathLike[str]"] +PathOrStr: TypeAlias = Union[str, os.PathLike[str], JoinablePath] class AnyProtocolFileSystemFlavour(FileSystemFlavourBase): @@ -79,7 +80,7 @@ class ProtocolConfig(TypedDict): root_marker_override: dict[str, str] -class WrappedFileSystemFlavour: # (pathlib_abc.FlavourBase) +class WrappedFileSystemFlavour(UPathParser): # (pathlib_abc.FlavourBase) """flavour class for universal_pathlib **INTERNAL AND VERY MUCH EXPERIMENTAL** @@ -115,8 +116,6 @@ class WrappedFileSystemFlavour: # (pathlib_abc.FlavourBase) "adl", "abfs", "abfss", - "webdav+http", - "webdav+https", }, "supports_empty_parts": { "http", @@ -241,6 +240,7 @@ def stringify_path(pth: PathOrStr) -> str: if isinstance(pth, str): out = pth elif getattr(pth, "__fspath__", None) is not None: + assert hasattr(pth, "__fspath__") out = pth.__fspath__() elif isinstance(pth, os.PathLike): out = str(pth) @@ -268,11 +268,11 @@ def parent(self, path: PathOrStr) -> str: # === pathlib_abc.FlavourBase ===================================== @property - def sep(self) -> str: + def sep(self) -> str: # type: ignore[override] return self._spec.sep @property - def altsep(self) -> str | None: + def altsep(self) -> str | None: # type: ignore[override] return None def isabs(self, path: PathOrStr) -> bool: @@ -283,6 +283,13 @@ def isabs(self, path: PathOrStr) -> bool: return path.startswith(self.root_marker) def join(self, path: PathOrStr, *paths: PathOrStr) -> str: + if not paths: + return self.strip_protocol(path) or self.root_marker + if self.local_file: + return os.path.join( + self.strip_protocol(path), + *paths, # type: ignore[arg-type] + ) if self.netloc_is_anchor: drv, p0 = self.splitdrive(path) pN = list(map(self.stringify_path, paths)) @@ -301,11 +308,22 @@ def join(self, path: PathOrStr, *paths: PathOrStr) -> str: def split(self, path: PathOrStr): stripped_path = self.strip_protocol(path) + if self.local_file: + return os.path.split(stripped_path) head = self.parent(stripped_path) or self.root_marker - if head: - return head, stripped_path[len(head) + 1 :] + if head == self.sep: + tail = stripped_path[1:] + elif head: + tail = stripped_path[len(head) + 1 :] else: - return "", stripped_path + tail = stripped_path + if ( + not tail + and not self.has_meaningful_trailing_slash + and self.strip_protocol(head) != stripped_path + ): + return self.split(head) + return head, tail def splitdrive(self, path: PathOrStr) -> tuple[str, str]: path = self.strip_protocol(path) @@ -338,6 +356,20 @@ def normcase(self, path: PathOrStr) -> str: else: return self.stringify_path(path) + def splitext(self, path: PathOrStr) -> tuple[str, str]: + path = self.stringify_path(path) + if self.local_file: + return os.path.splitext(path) + else: + path, sep, name = path.rpartition(self.sep) + if name: + stem, dot, ext = name.rpartition(".") + suffix = dot + ext + else: + stem = name + suffix = "" + return path + sep + stem, suffix + # === Python3.12 pathlib flavour ================================== def splitroot(self, path: PathOrStr) -> tuple[str, str, str]: @@ -348,54 +380,6 @@ def splitroot(self, path: PathOrStr) -> tuple[str, str, str]: root_marker = self.root_marker return drive, root_marker, tail.removeprefix(self.sep) - # === deprecated backwards compatibility =========================== - - @deprecated(python_version=(3, 12)) - def casefold(self, s: str) -> str: - if self.local_file: - return s - else: - return s.lower() - - @deprecated(python_version=(3, 12)) - def parse_parts(self, parts: Sequence[str]) -> tuple[str, str, list[str]]: - parsed = [] - sep = self.sep - drv = root = "" - it = reversed(parts) - for part in it: - if part: - drv, root, rel = self.splitroot(part) - if not root or root and rel: - for x in reversed(rel.split(sep)): - parsed.append(sys.intern(x)) - if drv or root: - parsed.append(drv + root) - parsed.reverse() - return drv, root, parsed - - @deprecated(python_version=(3, 12)) - def join_parsed_parts( - self, - drv: str, - root: str, - parts: list[str], - drv2: str, - root2: str, - parts2: list[str], - ) -> tuple[str, str, list[str]]: - if root2: - if not drv2 and drv: - return drv, root2, [drv + root2] + parts2[1:] - elif drv2: - if drv2 == drv or self.casefold(drv2) == self.casefold(drv): - # Same drive => second path is relative to the first - return drv, root, parts + parts2[1:] - else: - # Second path is non-anchored (common case) - return drv, root, parts + parts2 - return drv2, root2, parts2 - default_flavour = WrappedFileSystemFlavour(AnyProtocolFileSystemFlavour) @@ -415,9 +399,11 @@ def __set_name__(self, owner: type[UPath], name: str) -> None: except (AttributeError, IndexError): self._default_protocol = None - def __get__(self, instance: UPath, owner: type[UPath]) -> WrappedFileSystemFlavour: - if instance is not None: - return WrappedFileSystemFlavour.from_protocol(instance.protocol) + def __get__( + self, obj: UPath | None, objtype: type[UPath] | None = None + ) -> WrappedFileSystemFlavour: + if obj is not None: + return WrappedFileSystemFlavour.from_protocol(obj.protocol) elif self._default_protocol: # type: ignore return WrappedFileSystemFlavour.from_protocol(self._default_protocol) else: diff --git a/upath/_protocol.py b/upath/_protocol.py index d333dd6a..a8897ba7 100644 --- a/upath/_protocol.py +++ b/upath/_protocol.py @@ -7,7 +7,7 @@ from typing import Any if TYPE_CHECKING: - from upath.core import UPath + from upath.types import JoinablePath __all__ = [ "get_upath_protocol", @@ -34,14 +34,18 @@ def _match_protocol(pth: str) -> str: def get_upath_protocol( - pth: str | PurePath | os.PathLike, + pth: str | os.PathLike[str] | PurePath | JoinablePath, *, protocol: str | None = None, storage_options: dict[str, Any] | None = None, ) -> str: """return the filesystem spec protocol""" + from upath.core import UPath + if isinstance(pth, str): pth_protocol = _match_protocol(pth) + elif isinstance(pth, UPath): + pth_protocol = pth.protocol elif isinstance(pth, PurePath): pth_protocol = getattr(pth, "protocol", "") elif hasattr(pth, "__fspath__"): @@ -66,7 +70,10 @@ def normalize_empty_netloc(pth: str) -> str: return pth -def compatible_protocol(protocol: str, *args: str | os.PathLike[str] | UPath) -> bool: +def compatible_protocol( + protocol: str, + *args: str | os.PathLike[str] | PurePath | JoinablePath, +) -> bool: """check if UPath protocols are compatible""" for arg in args: other_protocol = get_upath_protocol(arg) diff --git a/upath/core.py b/upath/core.py index 26e122da..a30bfb31 100644 --- a/upath/core.py +++ b/upath/core.py @@ -3,11 +3,12 @@ import os import sys import warnings -from collections.abc import Generator +from abc import ABCMeta +from abc import abstractmethod +from collections.abc import Iterator from collections.abc import Mapping from collections.abc import Sequence from copy import copy -from pathlib import Path from types import MappingProxyType from typing import IO from typing import TYPE_CHECKING @@ -16,13 +17,12 @@ from typing import Literal from typing import TextIO from typing import overload +from urllib.parse import SplitResult from urllib.parse import urlsplit from fsspec.registry import get_filesystem_class from fsspec.spec import AbstractFileSystem -from upath._compat import PathlibPathShim -from upath._compat import method_and_classmethod from upath._flavour import LazyFlavourDescriptor from upath._flavour import upath_get_kwargs_from_url from upath._flavour import upath_urijoin @@ -30,26 +30,22 @@ from upath._protocol import get_upath_protocol from upath._stat import UPathStatResult from upath.registry import get_upath_class +from upath.types import UNSET_DEFAULT +from upath.types import JoinablePathLike +from upath.types import OpenablePath +from upath.types import PathInfo +from upath.types import ReadablePathLike +from upath.types import UPathParser +from upath.types import WritablePathLike if TYPE_CHECKING: - from urllib.parse import SplitResult - if sys.version_info >= (3, 11): from typing import Self else: from typing_extensions import Self -__all__ = ["UPath"] - -def __getattr__(name): - if name in {"_UriFlavour", "_FSSpecAccessor", "PT"}: - warnings.warn( - f"upath.core.{name} has been removed.", - UserWarning, - stacklevel=2, - ) - raise AttributeError(name) +__all__ = ["UPath"] _FSSPEC_HAS_WORKING_GLOB = None @@ -71,53 +67,176 @@ def _make_instance(cls, args, kwargs): return cls(*args, **kwargs) -_unset: Any = object() +def _explode_path(path, parser): + split = parser.split + path = parser.strip_protocol(path) + parent, name = parser.split(path) + names = [] + while path != parent: + names.append(name) + path = parent + parent, name = split(path) + return path, names + + +def _buffering2blocksize(mode: str, buffering: int) -> int | None: + if not isinstance(buffering, int): + raise TypeError("buffering must be an integer") + if buffering == 0: # buffering disabled + if "b" not in mode: # text mode + raise ValueError("can't have unbuffered text I/O") + return buffering + elif buffering == -1: + return None + else: + return buffering -class UPath(PathlibPathShim, Path): - __slots__ = ( - "_protocol", - "_storage_options", - "_fs_cached", - *PathlibPathShim.__missing_py312_slots__, - "__drv", - "__root", - "__parts", - ) +if sys.version_info >= (3, 11): + _UPathMeta = ABCMeta - if TYPE_CHECKING: - # public - anchor: str - drive: str - parent: Self - parents: Sequence[Self] - parts: tuple[str, ...] - root: str - stem: str - suffix: str - suffixes: list[str] - - def with_name(self, name: str) -> Self: ... - def with_stem(self, stem: str) -> Self: ... - def with_suffix(self, suffix: str) -> Self: ... - - # private attributes - _protocol: str - _storage_options: dict[str, Any] - _fs_cached: AbstractFileSystem - _tail: str +else: - _protocol_dispatch: bool | None = None - _flavour = LazyFlavourDescriptor() + class _UPathMeta(ABCMeta): + # pathlib 3.9 and 3.10 supported `Path[str]` but + # did not return a GenericAlias but the class itself? + def __getitem__(cls, key): + return cls - if sys.version_info >= (3, 13): - parser = _flavour + +class _UPathMixin(metaclass=_UPathMeta): + __slots__ = () + + @property + @abstractmethod + def parser(self) -> UPathParser: + raise NotImplementedError + + @property + @abstractmethod + def _protocol(self) -> str: + raise NotImplementedError + + @_protocol.setter + def _protocol(self, value: str) -> None: + raise NotImplementedError + + @property + @abstractmethod + def _storage_options(self) -> dict[str, Any]: + raise NotImplementedError + + @_storage_options.setter + def _storage_options(self, value: dict[str, Any]) -> None: + raise NotImplementedError + + @property + @abstractmethod + def _fs_cached(self) -> AbstractFileSystem: + raise NotImplementedError + + @_fs_cached.setter + def _fs_cached(self, value: AbstractFileSystem): + raise NotImplementedError + + @property + @abstractmethod + def _raw_urlpaths(self) -> Sequence[JoinablePathLike]: + raise NotImplementedError + + @_raw_urlpaths.setter + def _raw_urlpaths(self, value: Sequence[JoinablePathLike]) -> None: + raise NotImplementedError + + # === upath.UPath PUBLIC ADDITIONAL API =========================== + + @property + def protocol(self) -> str: + """The fsspec protocol for the path.""" + return self._protocol + + @property + def storage_options(self) -> Mapping[str, Any]: + """The fsspec storage options for the path.""" + return MappingProxyType(self._storage_options) + + @property + def fs(self) -> AbstractFileSystem: + """The cached fsspec filesystem instance for the path.""" + try: + return self._fs_cached + except AttributeError: + fs = self._fs_cached = self._fs_factory( + str(self), self.protocol, self.storage_options + ) + return fs + + @property + def path(self) -> str: + """The path that a fsspec filesystem can use.""" + return self.parser.strip_protocol(self.__str__()) + + def joinuri(self, uri: JoinablePathLike) -> UPath: + """Join with urljoin behavior for UPath instances""" + # short circuit if the new uri uses a different protocol + other_protocol = get_upath_protocol(uri) + if other_protocol and other_protocol != self._protocol: + return UPath(uri) + return UPath( + upath_urijoin(str(self), str(uri)), + protocol=other_protocol or self._protocol, + **self.storage_options, + ) + + # === upath.UPath CUSTOMIZABLE API ================================ + + @classmethod + def _transform_init_args( + cls, + args: tuple[JoinablePathLike, ...], + protocol: str, + storage_options: dict[str, Any], + ) -> tuple[tuple[JoinablePathLike, ...], str, dict[str, Any]]: + """allow customization of init args in subclasses""" + return args, protocol, storage_options + + @classmethod + def _parse_storage_options( + cls, + urlpath: str, + protocol: str, + storage_options: Mapping[str, Any], + ) -> dict[str, Any]: + """Parse storage_options from the urlpath""" + pth_storage_options = upath_get_kwargs_from_url(urlpath) + return {**pth_storage_options, **storage_options} + + @classmethod + def _fs_factory( + cls, + urlpath: str, + protocol: str, + storage_options: Mapping[str, Any], + ) -> AbstractFileSystem: + """Instantiate the filesystem_spec filesystem class""" + fs_cls = get_filesystem_class(protocol) + so_dct = fs_cls._get_kwargs_from_urls(urlpath) + so_dct.update(storage_options) + return fs_cls(**storage_options) # === upath.UPath constructor ===================================== + _protocol_dispatch: bool | None = None + def __new__( - cls, *args, protocol: str | None = None, **storage_options: Any + cls, + *args: JoinablePathLike, + protocol: str | None = None, + **storage_options: Any, ) -> UPath: + # narrow type + assert issubclass(cls, UPath), "_UPathMixin should never be instantiated" + # fill empty arguments if not args: args = (".",) @@ -162,6 +281,11 @@ def __new__( obj: UPath = object.__new__(upath_cls) obj._protocol = pth_protocol + if cls not in upath_cls.mro(): + # we are not in the upath_cls mro, so we need to + # call __init__ of the upath_cls + upath_cls.__init__(obj, *args, protocol=pth_protocol, **storage_options) + elif issubclass(cls, upath_cls): # we called a sub- or sub-sub-class of UPath, i.e. S3Path() and the # corresponding upath_cls based on protocol is equal-to or a @@ -203,7 +327,10 @@ def __new__( return obj def __init__( - self, *args, protocol: str | None = None, **storage_options: Any + self, + *args: JoinablePathLike, + protocol: str | None = None, + **storage_options: Any, ) -> None: # allow subclasses to customize __init__ arg parsing base_options = getattr(self, "_storage_options", {}) @@ -237,490 +364,144 @@ def __init__( if not compatible_protocol(self._protocol, *args): raise ValueError("can't combine incompatible UPath protocols") - # fill ._raw_paths - if hasattr(self, "_raw_paths"): + if hasattr(self, "_raw_urlpaths"): return - super().__init__(*args) - - # === upath.UPath PUBLIC ADDITIONAL API =========================== - - @property - def protocol(self) -> str: - """The fsspec protocol for the path.""" - return self._protocol - - @property - def storage_options(self) -> Mapping[str, Any]: - """The fsspec storage options for the path.""" - return MappingProxyType(self._storage_options) - - @property - def fs(self) -> AbstractFileSystem: - """The cached fsspec filesystem instance for the path.""" - try: - return self._fs_cached - except AttributeError: - fs = self._fs_cached = self._fs_factory( - str(self), self.protocol, self.storage_options - ) - return fs - - @property - def path(self) -> str: - """The path that a fsspec filesystem can use.""" - return super().__str__() - - def joinuri(self, uri: str | os.PathLike[str]) -> UPath: - """Join with urljoin behavior for UPath instances""" - # short circuit if the new uri uses a different protocol - other_protocol = get_upath_protocol(uri) - if other_protocol and other_protocol != self._protocol: - return UPath(uri) - return UPath( - upath_urijoin(str(self), str(uri)), - protocol=other_protocol or self._protocol, - **self.storage_options, - ) - - # === upath.UPath CUSTOMIZABLE API ================================ - - @classmethod - def _transform_init_args( - cls, - args: tuple[str | os.PathLike, ...], - protocol: str, - storage_options: dict[str, Any], - ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]: - """allow customization of init args in subclasses""" - return args, protocol, storage_options - - @classmethod - def _parse_storage_options( - cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] - ) -> dict[str, Any]: - """Parse storage_options from the urlpath""" - pth_storage_options = upath_get_kwargs_from_url(urlpath) - return {**pth_storage_options, **storage_options} - - @classmethod - def _fs_factory( - cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] - ) -> AbstractFileSystem: - """Instantiate the filesystem_spec filesystem class""" - fs_cls = get_filesystem_class(protocol) - so_dct = fs_cls._get_kwargs_from_urls(urlpath) - so_dct.update(storage_options) - return fs_cls(**storage_options) + self._raw_urlpaths = args - # === upath.UPath COMPATIBILITY API =============================== - - def __init_subclass__(cls, **kwargs): - """provide a clean migration path for custom user subclasses""" - - # Check if the user subclass has a custom `__new__` method - has_custom_new_method = ( - cls.__new__ is not UPath.__new__ - and cls.__name__ not in {"PosixUPath", "WindowsUPath"} - ) - - if has_custom_new_method and cls._protocol_dispatch is None: - warnings.warn( - "Detected a customized `__new__` method in subclass" - f" {cls.__name__!r}. Protocol dispatch will be disabled" - " for this subclass. Please follow the" - " universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - UserWarning, - stacklevel=2, - ) - cls._protocol_dispatch = False - - @property - def _path(self): - warnings.warn( - "UPath._path is deprecated and should not be used." - " Please follow the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - return self.path - - @property - def _kwargs(self): - warnings.warn( - "UPath._kwargs is deprecated. Please use" - " UPath.storage_options instead. Follow the" - " universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - return self.storage_options + # --- deprecated attributes --------------------------------------- + # deprecation @property def _url(self) -> SplitResult: # TODO: # _url should be deprecated, but for now there is no good way of # accessing query parameters from urlpaths... - return urlsplit(self.as_posix()) - - if not TYPE_CHECKING: - # allow mypy to catch missing attributes + return urlsplit(self.__str__()) - def __getattr__(self, item): - if item == "_accessor": - warnings.warn( - "UPath._accessor has been removed.", - UserWarning, - stacklevel=2, - ) - raise AttributeError(item) - - @classmethod - def _from_parts(cls, parts, **kwargs): - warnings.warn( - "UPath._from_parts is deprecated and should not be used." - " Please follow the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - parsed_url = kwargs.pop("url", None) - if parsed_url: - if protocol := parsed_url.scheme: - kwargs["protocol"] = protocol - if netloc := parsed_url.netloc: - kwargs["netloc"] = netloc - obj = UPath.__new__(cls, parts, **kwargs) - obj.__init__(*parts, **kwargs) - return obj - - @classmethod - def _parse_args(cls, args): - warnings.warn( - "UPath._parse_args is deprecated and should not be used." - " Please follow the universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - # TODO !!! - pth = cls._flavour.join(*args) - return cls._parse_path(pth) - @property - def _drv(self): - # direct access to ._drv should emit a warning, - # but there is no good way of doing this for now... - try: - return self.__drv - except AttributeError: - self._load_parts() - return self.__drv - - @_drv.setter - def _drv(self, value): - self.__drv = value - - @property - def _root(self): - # direct access to ._root should emit a warning, - # but there is no good way of doing this for now... - try: - return self.__root - except AttributeError: - self._load_parts() - return self.__root - - @_root.setter - def _root(self, value): - self.__root = value - - @property - def _parts(self): - # UPath._parts is not used anymore, and not available - # in pathlib.Path for Python 3.12 and later. - # Direct access to ._parts should emit a deprecation warning, - # but there is no good way of doing this for now... - try: - return self.__parts - except AttributeError: - self._load_parts() - self.__parts = super().parts - return list(self.__parts) - - @_parts.setter - def _parts(self, value): - self.__parts = value +class UPath(_UPathMixin, OpenablePath): + __slots__ = ( + "_protocol", + "_storage_options", + "_fs_cached", + "_raw_urlpaths", + ) - @property - def _cparts(self): - # required for pathlib.Path.__eq__ compatibility on Python <3.12 - return self.parts + if TYPE_CHECKING: + _protocol: str + _storage_options: dict[str, Any] + _fs_cached: bool + _raw_urlpaths: Sequence[JoinablePathLike] - # === pathlib.PurePath ============================================ + # === JoinablePath attributes ===================================== - def __reduce__(self): - args = tuple(self._raw_paths) - kwargs = { - "protocol": self._protocol, - **self._storage_options, - } - return _make_instance, (type(self), args, kwargs) + parser: UPathParser = LazyFlavourDescriptor() # type: ignore[assignment] - def with_segments(self, *pathsegments: str | os.PathLike[str]) -> Self: + def with_segments(self, *pathsegments: JoinablePathLike) -> Self: return type(self)( *pathsegments, protocol=self._protocol, **self._storage_options, ) - def joinpath(self, *pathsegments: str | os.PathLike[str]) -> Self: - return self.with_segments(self, *pathsegments) - - def __truediv__(self, key: str | os.PathLike[str]) -> Self: - try: - return self.joinpath(key) - except TypeError: - return NotImplemented - - def __rtruediv__(self, key: str | os.PathLike[str]) -> Self: - try: - return self.with_segments(key, self) - except TypeError: - return NotImplemented - - # === upath.UPath non-standard changes ============================ - - # NOTE: - # this is a classmethod on the parent class, but we need to - # override it here to make it possible to provide the _flavour - # with the correct protocol... - # pathlib 3.12 never calls this on the class. Only on the instance. - @method_and_classmethod - def _parse_path(self_or_cls, path): # noqa: B902 - if isinstance(self_or_cls, type): - warnings.warn( - "UPath._parse_path should not be used as a classmethod." - " Please file an issue on the universal_pathlib issue tracker" - " and describe your use case.", - DeprecationWarning, - stacklevel=2, - ) - flavour = self_or_cls._flavour - - if flavour.supports_empty_parts: - drv, root, rel = flavour.splitroot(path) - if not root: - parsed = [] - else: - parsed = list(map(sys.intern, rel.split(flavour.sep))) - if parsed[-1] == ".": - parsed[-1] = "" - parsed = [x for x in parsed if x != "."] - if not flavour.has_meaningful_trailing_slash and parsed[-1] == "": - parsed.pop() - return drv, root, parsed - if not path: - return "", "", [] - sep = flavour.sep - altsep = flavour.altsep - if altsep: - path = path.replace(altsep, sep) - drv, root, rel = flavour.splitroot(path) - if not root and drv.startswith(sep) and not drv.endswith(sep): - drv_parts = drv.split(sep) - if len(drv_parts) == 4 and drv_parts[2] not in "?.": - # e.g. //server/share - root = sep - elif len(drv_parts) == 6: - # e.g. //?/unc/server/share - root = sep - parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."] - return drv, root, parsed - - @method_and_classmethod - def _format_parsed_parts(self_or_cls, drv, root, tail, **kwargs): # noqa: B902 - if isinstance(self_or_cls, type): - warnings.warn( - "UPath._format_parsed_path should not be used as a classmethod." - " Please file an issue on the universal_pathlib issue tracker" - " and describe your use case.", - DeprecationWarning, - stacklevel=2, - ) - flavour = self_or_cls._flavour - - if kwargs: - warnings.warn( - "UPath._format_parsed_parts should not be used with" - " additional kwargs. Please follow the" - " universal_pathlib==0.2.0 migration guide at" - " https://github.com/fsspec/universal_pathlib for more" - " information.", - DeprecationWarning, - stacklevel=2, - ) - if "url" in kwargs and tail[:1] == [f"{drv}{root}"]: - # This was called from code that expected py38-py311 behavior - # of _format_parsed_parts, which takes drv, root and parts - tail = tail[1:] - - if drv or root: - return drv + root + flavour.sep.join(tail) - elif tail and flavour.splitdrive(tail[0])[0]: - tail = ["."] + tail - return flavour.sep.join(tail) - - # === upath.UPath changes ========================================= - - def __str__(self): + def __str__(self) -> str: + path = self.parser.join(*self._raw_urlpaths) if self._protocol: - return f"{self._protocol}://{self.path}" - else: - return self.path - - def __fspath__(self): - msg = ( - "in a future version of UPath this will be set to None" - " unless the filesystem is local (or caches locally)" - ) - warnings.warn(msg, PendingDeprecationWarning, stacklevel=2) - return str(self) - - def __bytes__(self): - msg = ( - "in a future version of UPath this will be set to None" - " unless the filesystem is local (or caches locally)" - ) - warnings.warn(msg, PendingDeprecationWarning, stacklevel=2) - return os.fsencode(self) - - def as_uri(self) -> str: - return str(self) - - def is_reserved(self) -> bool: - return False - - def __eq__(self, other: object) -> bool: - """UPaths are considered equal if their protocol, path and - storage_options are equal.""" - if not isinstance(other, UPath): - return NotImplemented - return ( - self.path == other.path - and self.protocol == other.protocol - and self.storage_options == other.storage_options - ) - - def __hash__(self) -> int: - """The returned hash is based on the protocol and path only. - - Note: in the future, if hash collisions become an issue, we - can add `fsspec.utils.tokenize(storage_options)` - """ - return hash((self.protocol, self.path)) + if path.startswith(f"{self._protocol}://"): + return path + elif path.startswith(f"{self._protocol}:/"): + return path.replace(":/", "://", 1) + else: + return f"{self._protocol}://{path}" + return path - def relative_to( # type: ignore[override] - self, - other, - /, - *_deprecated, - walk_up=False, - ) -> Self: - if isinstance(other, UPath) and self.storage_options != other.storage_options: - raise ValueError( - "paths have different storage_options:" - f" {self.storage_options!r} != {other.storage_options!r}" - ) - return super().relative_to(other, *_deprecated, walk_up=walk_up) + def __repr__(self) -> str: + return f"{type(self).__name__}({self.path!r}, protocol={self._protocol!r})" - def is_relative_to(self, other, /, *_deprecated) -> bool: # type: ignore[override] - if isinstance(other, UPath) and self.storage_options != other.storage_options: - return False - return super().is_relative_to(other, *_deprecated) + # === JoinablePath overrides ====================================== @property - def name(self) -> str: - tail = self._tail - if not tail: - return "" - name = tail[-1] - if not name and len(tail) >= 2: - return tail[-2] - else: - return name - - # === pathlib.Path ================================================ - - def stat( # type: ignore[override] - self, - *, - follow_symlinks=True, - ) -> UPathStatResult: - if not follow_symlinks: - warnings.warn( - f"{type(self).__name__}.stat(follow_symlinks=False):" - " is currently ignored.", - UserWarning, - stacklevel=2, - ) - return UPathStatResult.from_info(self.fs.stat(self.path)) - - def lstat(self) -> UPathStatResult: # type: ignore[override] - return self.stat(follow_symlinks=False) - - def exists(self, *, follow_symlinks=True) -> bool: - return self.fs.exists(self.path) - - def is_dir(self) -> bool: - return self.fs.isdir(self.path) - - def is_file(self) -> bool: - return self.fs.isfile(self.path) - - def is_mount(self) -> bool: - return False + def parts(self) -> Sequence[str]: + anchor, parts = _explode_path(str(self), self.parser) + if anchor: + parts.append(anchor) + return tuple(reversed(parts)) + + def with_name(self, name) -> Self: + """Return a new path with the file name changed.""" + split = self.parser.split + if self.parser.sep in name: # `split(name)[0]` + raise ValueError(f"Invalid name {name!r}") + path = str(self) + path = path.removesuffix(split(path)[1]) + name + return self.with_segments(path) + + # === ReadablePath attributes ===================================== - def is_symlink(self) -> bool: - try: - info = self.fs.info(self.path) - if "islink" in info: - return bool(info["islink"]) - except FileNotFoundError: - return False - return False + @property + def info(self) -> PathInfo: + raise NotImplementedError("todo") + + def iterdir(self) -> Iterator[Self]: + sep = self.parser.sep + base = self + if self.parts[-1:] == ("",): + base = self.parent + for name in base.fs.listdir(base.path): + # fsspec returns dictionaries + if isinstance(name, dict): + name = name.get("name") + if name in {".", ".."}: + # Yielding a path object for these makes little sense + continue + # only want the path name with iterdir + _, _, name = name.removesuffix(sep).rpartition(self.parser.sep) + yield base.with_segments(str(base), name) - def is_junction(self) -> bool: - return False + def __open_rb__(self, buffering=-1) -> BinaryIO: + block_size = _buffering2blocksize("wb", buffering) + kw = {} + if block_size is not None: + kw["block_size"] = block_size + return self.fs.open(self.path, mode="rb", **kw) - def is_block_device(self) -> bool: - return False + def readlink(self) -> Self: + raise NotImplementedError - def is_char_device(self) -> bool: - return False + # --- WritablePath attributes ------------------------------------- - def is_fifo(self) -> bool: - return False + def symlink_to( # type: ignore[override] + self, + target: str | os.PathLike[str] | UPath, + target_is_directory: bool = False, + ) -> None: + raise NotImplementedError - def is_socket(self) -> bool: - return False + def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: + if parents and not exist_ok and self.exists(): + raise FileExistsError(str(self)) + try: + self.fs.mkdir( + self.path, + create_parents=parents, + mode=mode, + ) + except FileExistsError: + if not exist_ok: + raise FileExistsError(str(self)) + if not self.is_dir(): + raise FileExistsError(str(self)) - def samefile(self, other_path) -> bool: - st = self.stat() - if isinstance(other_path, UPath): - other_st = other_path.stat() - else: - other_st = self.with_segments(other_path).stat() - return st == other_st + def __open_wb__(self, buffering=-1) -> BinaryIO: + block_size = _buffering2blocksize("wb", buffering) + kw = {} + if block_size is not None: + kw["block_size"] = block_size + return self.fs.open(self.path, mode="wb", **kw) + + # --- upath overrides --------------------------------------------- - @overload # type: ignore[override] + @overload def open( self, mode: Literal["r", "w", "a"] = "r", @@ -732,7 +513,7 @@ def open( ) -> TextIO: ... @overload - def open( # type: ignore[override] + def open( self, mode: Literal["rb", "wb", "ab"], buffering: int = ..., @@ -776,44 +557,128 @@ def open( fsspec_kwargs[key] = value # translate pathlib buffering to fs block_size if "buffering" in fsspec_kwargs: - fsspec_kwargs.setdefault("block_size", fsspec_kwargs.pop("buffering")) + if "block_size" in fsspec_kwargs: + raise TypeError("cannot specify both 'buffering' and 'block_size'") + block_size = _buffering2blocksize(mode, fsspec_kwargs.pop("buffering")) + if block_size is not None: + fsspec_kwargs.setdefault("block_size", block_size) return self.fs.open(self.path, mode=mode, **fsspec_kwargs) - def iterdir(self) -> Generator[UPath]: - for name in self.fs.listdir(self.path): - # fsspec returns dictionaries - if isinstance(name, dict): - name = name.get("name") - if name in {".", ".."}: - # Yielding a path object for these makes little sense - continue - # only want the path name with iterdir - _, _, name = name.removesuffix("/").rpartition(self._flavour.sep) - yield self.with_segments(*self.parts, name) + # === pathlib.Path ================================================ - def _scandir(self): - raise NotImplementedError # todo + def stat( + self, + *, + follow_symlinks=True, + ) -> UPathStatResult: + if not follow_symlinks: + warnings.warn( + f"{type(self).__name__}.stat(follow_symlinks=False):" + " is currently ignored.", + UserWarning, + stacklevel=2, + ) + return UPathStatResult.from_info(self.fs.info(self.path)) - def _make_child_relpath(self, name): - path = super()._make_child_relpath(name) - del path._str # fix _str = str(self) assignment - return path + def lstat(self) -> UPathStatResult: + return self.stat(follow_symlinks=False) + + def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: + raise NotImplementedError + + def exists(self, *, follow_symlinks=True) -> bool: + return self.fs.exists(self.path) - def glob(self, pattern: str, *, case_sensitive=None) -> Generator[UPath]: + def is_dir(self) -> bool: + return self.fs.isdir(self.path) + + def is_file(self) -> bool: + return self.fs.isfile(self.path) + + def is_mount(self) -> bool: + return False + + def is_symlink(self) -> bool: + try: + info = self.fs.info(self.path) + if "islink" in info: + return bool(info["islink"]) + except FileNotFoundError: + return False + return False + + def is_junction(self) -> bool: + return False + + def is_block_device(self) -> bool: + return False + + def is_char_device(self) -> bool: + return False + + def is_fifo(self) -> bool: + return False + + def is_socket(self) -> bool: + return False + + def is_reserved(self) -> bool: + return False + + def expanduser(self) -> Self: + return self + + def glob( + self, + pattern: str, + *, + case_sensitive: bool = UNSET_DEFAULT, + recurse_symlinks: bool = UNSET_DEFAULT, + ) -> Iterator[UPath]: + if case_sensitive is not UNSET_DEFAULT: + warnings.warn( + "UPath.glob(): case_sensitive is currently ignored.", + UserWarning, + stacklevel=2, + ) + if recurse_symlinks is not UNSET_DEFAULT: + warnings.warn( + "UPath.glob(): recurse_symlinks is currently ignored.", + UserWarning, + stacklevel=2, + ) path_pattern = self.joinpath(pattern).path - sep = self._flavour.sep + sep = self.parser.sep base = self.fs._strip_protocol(self.path) for name in self.fs.glob(path_pattern): name = name.removeprefix(base).removeprefix(sep) yield self.joinpath(name) - def rglob(self, pattern: str, *, case_sensitive=None) -> Generator[UPath]: + def rglob( + self, + pattern: str, + *, + case_sensitive: bool = UNSET_DEFAULT, + recurse_symlinks: bool = UNSET_DEFAULT, + ) -> Iterator[UPath]: + if case_sensitive is not UNSET_DEFAULT: + warnings.warn( + "UPath.glob(): case_sensitive is currently ignored.", + UserWarning, + stacklevel=2, + ) + if recurse_symlinks is not UNSET_DEFAULT: + warnings.warn( + "UPath.glob(): recurse_symlinks is currently ignored.", + UserWarning, + stacklevel=2, + ) if _FSSPEC_HAS_WORKING_GLOB is None: _check_fsspec_has_working_glob() if _FSSPEC_HAS_WORKING_GLOB: r_path_pattern = self.joinpath("**", pattern).path - sep = self._flavour.sep + sep = self.parser.sep base = self.fs._strip_protocol(self.path) for name in self.fs.glob(r_path_pattern): name = name.removeprefix(base).removeprefix(sep) @@ -822,7 +687,7 @@ def rglob(self, pattern: str, *, case_sensitive=None) -> Generator[UPath]: else: path_pattern = self.joinpath(pattern).path r_path_pattern = self.joinpath("**", pattern).path - sep = self._flavour.sep + sep = self.parser.sep base = self.fs._strip_protocol(self.path) seen = set() for p in (path_pattern, r_path_pattern): @@ -834,25 +699,56 @@ def rglob(self, pattern: str, *, case_sensitive=None) -> Generator[UPath]: seen.add(name) yield self.joinpath(name) - @classmethod - def cwd(cls) -> UPath: - if cls is UPath: - return get_upath_class("").cwd() # type: ignore[union-attr] - else: - raise NotImplementedError + def owner(self) -> str: + raise NotImplementedError - @classmethod - def home(cls) -> UPath: - if cls is UPath: - return get_upath_class("").home() # type: ignore[union-attr] - else: - raise NotImplementedError + def group(self) -> str: + raise NotImplementedError def absolute(self) -> Self: return self def is_absolute(self) -> bool: - return self._flavour.isabs(str(self)) + return self.parser.isabs(str(self)) + + def __eq__(self, other: object) -> bool: + """UPaths are considered equal if their protocol, path and + storage_options are equal.""" + if not isinstance(other, UPath): + return NotImplemented + return ( + self.path == other.path + and self.protocol == other.protocol + and self.storage_options == other.storage_options + ) + + def __hash__(self) -> int: + """The returned hash is based on the protocol and path only. + + Note: in the future, if hash collisions become an issue, we + can add `fsspec.utils.tokenize(storage_options)` + """ + return hash((self.protocol, self.path)) + + def __lt__(self, other: object) -> bool: + if not isinstance(other, UPath) or self.parser is not other.parser: + return NotImplemented + return self.path < other.path + + def __le__(self, other: object) -> bool: + if not isinstance(other, UPath) or self.parser is not other.parser: + return NotImplemented + return self.path <= other.path + + def __gt__(self, other: object) -> bool: + if not isinstance(other, UPath) or self.parser is not other.parser: + return NotImplemented + return self.path > other.path + + def __ge__(self, other: object) -> bool: + if not isinstance(other, UPath) or self.parser is not other.parser: + return NotImplemented + return self.path >= other.path def resolve(self, strict: bool = False) -> Self: _parts = self.parts @@ -872,15 +768,6 @@ def resolve(self, strict: bool = False) -> Self: return self.with_segments(*_parts[:1], *resolved) - def owner(self) -> str: - raise NotImplementedError - - def group(self) -> str: - raise NotImplementedError - - def readlink(self) -> Self: - raise NotImplementedError - def touch(self, mode=0o666, exist_ok=True) -> None: exists = self.fs.exists(self.path) if exists and not exist_ok: @@ -893,24 +780,6 @@ def touch(self, mode=0o666, exist_ok=True) -> None: except (NotImplementedError, ValueError): pass # unsupported by filesystem - def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: - if parents and not exist_ok and self.exists(): - raise FileExistsError(str(self)) - try: - self.fs.mkdir( - self.path, - create_parents=parents, - mode=mode, - ) - except FileExistsError: - if not exist_ok: - raise FileExistsError(str(self)) - if not self.is_dir(): - raise FileExistsError(str(self)) - - def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: - raise NotImplementedError - def lchmod(self, mode: int) -> None: raise NotImplementedError @@ -930,10 +799,10 @@ def rmdir(self, recursive: bool = True) -> None: # fixme: non-standard def rename( self, - target: str | os.PathLike[str] | UPath, + target: WritablePathLike, *, # note: non-standard compared to pathlib - recursive: bool = _unset, - maxdepth: int | None = _unset, + recursive: bool = UNSET_DEFAULT, + maxdepth: int | None = UNSET_DEFAULT, **kwargs: Any, ) -> Self: if isinstance(target, str) and self.storage_options: @@ -956,11 +825,11 @@ def rename( # avoid calling .resolve for subclasses of UPath if ".." in parent.parts or "." in parent.parts: parent = parent.resolve() - target_ = parent.joinpath(os.path.normpath(target)) + target_ = parent.joinpath(os.path.normpath(str(target))) assert isinstance(target_, type(self)), "identical protocols enforced above" - if recursive is not _unset: + if recursive is not UNSET_DEFAULT: kwargs["recursive"] = recursive - if maxdepth is not _unset: + if maxdepth is not UNSET_DEFAULT: kwargs["maxdepth"] = maxdepth self.fs.mv( self.path, @@ -972,18 +841,77 @@ def rename( def replace(self, target: str | os.PathLike[str] | UPath) -> UPath: raise NotImplementedError # todo - def symlink_to( # type: ignore[override] - self, - target: str | os.PathLike[str] | UPath, - target_is_directory: bool = False, - ) -> None: - raise NotImplementedError + @property + def drive(self) -> str: + return self.parser.splitdrive(str(self))[0] + + @property + def root(self) -> str: + return self.parser.splitroot(str(self))[1] + + def __reduce__(self): + args = tuple(self._raw_urlpaths) + kwargs = { + "protocol": self._protocol, + **self._storage_options, + } + return _make_instance, (type(self), args, kwargs) + + def as_uri(self) -> str: + return str(self) + + def as_posix(self) -> str: + return str(self) + + def samefile(self, other_path) -> bool: + st = self.stat() + if isinstance(other_path, UPath): + other_st = other_path.stat() + else: + other_st = self.with_segments(other_path).stat() + return st == other_st + + @classmethod + def cwd(cls) -> UPath: + if cls is UPath: + return get_upath_class("").cwd() # type: ignore[union-attr] + else: + raise NotImplementedError + + @classmethod + def home(cls) -> UPath: + if cls is UPath: + return get_upath_class("").home() # type: ignore[union-attr] + else: + raise NotImplementedError - def hardlink_to( # type: ignore[override] + def relative_to( # type: ignore[override] self, - target: str | os.PathLike[str] | UPath, - ) -> None: + other, + /, + *_deprecated, + walk_up=False, + ) -> Self: + if isinstance(other, UPath) and ( + (self.__class__ is not other.__class__) + or (self.storage_options != other.storage_options) + ): + raise ValueError( + "paths have different storage_options:" + f" {self.storage_options!r} != {other.storage_options!r}" + ) + return self # super().relative_to(other, *_deprecated, walk_up=walk_up) + + def is_relative_to(self, other, /, *_deprecated) -> bool: # type: ignore[override] + if isinstance(other, UPath) and self.storage_options != other.storage_options: + return False + return self == other or other in self.parents + + def hardlink_to(self, target: ReadablePathLike) -> None: raise NotImplementedError - def expanduser(self) -> Self: - return self + def match(self, pattern: str) -> bool: + # fixme: hacky emulation of match. needs tests... + if not pattern: + raise ValueError("pattern cannot be empty") + return self.full_match(pattern.replace("**", "*")) diff --git a/upath/errors.py b/upath/errors.py deleted file mode 100644 index e7c629a1..00000000 --- a/upath/errors.py +++ /dev/null @@ -1,14 +0,0 @@ -import warnings - - -def __getattr__(name): - """Provide deprecation warning for NotDirectoryError.""" - if name == "NotDirectoryError": - warnings.warn( - "upath.errors.NotDirectoryError is deprecated. " - "Use NotADirectoryError instead", - DeprecationWarning, - stacklevel=2, - ) - return NotADirectoryError - raise AttributeError(name) diff --git a/upath/implementations/cloud.py b/upath/implementations/cloud.py index 3863e47b..865e3e95 100644 --- a/upath/implementations/cloud.py +++ b/upath/implementations/cloud.py @@ -1,10 +1,19 @@ from __future__ import annotations -import os +import sys +from collections.abc import Iterator +from typing import TYPE_CHECKING from typing import Any from upath._flavour import upath_strip_protocol from upath.core import UPath +from upath.types import JoinablePathLike + +if TYPE_CHECKING: + if sys.version_info >= (3, 11): + from typing import Self + else: + from typing_extensions import Self __all__ = [ "CloudPath", @@ -20,10 +29,10 @@ class CloudPath(UPath): @classmethod def _transform_init_args( cls, - args: tuple[str | os.PathLike, ...], + args: tuple[JoinablePathLike, ...], protocol: str, storage_options: dict[str, Any], - ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]: + ) -> tuple[tuple[JoinablePathLike, ...], str, dict[str, Any]]: for key in ["bucket", "netloc"]: bucket = storage_options.pop(key, None) if bucket: @@ -42,13 +51,10 @@ def mkdir( raise FileExistsError(self.path) super().mkdir(mode=mode, parents=parents, exist_ok=exist_ok) - def iterdir(self): + def iterdir(self) -> Iterator[Self]: if self.is_file(): raise NotADirectoryError(str(self)) - if self.parts[-1:] == ("",): - yield from self.parent.iterdir() - else: - yield from super().iterdir() + yield from super().iterdir() def relative_to(self, other, /, *_deprecated, walk_up=False): # use the parent implementation for the ValueError logic @@ -60,7 +66,10 @@ class GCSPath(CloudPath): __slots__ = () def __init__( - self, *args, protocol: str | None = None, **storage_options: Any + self, + *args: JoinablePathLike, + protocol: str | None = None, + **storage_options: Any, ) -> None: super().__init__(*args, protocol=protocol, **storage_options) if not self.drive and len(self.parts) > 1: @@ -80,7 +89,10 @@ class S3Path(CloudPath): __slots__ = () def __init__( - self, *args, protocol: str | None = None, **storage_options: Any + self, + *args: JoinablePathLike, + protocol: str | None = None, + **storage_options: Any, ) -> None: super().__init__(*args, protocol=protocol, **storage_options) if not self.drive and len(self.parts) > 1: @@ -91,7 +103,10 @@ class AzurePath(CloudPath): __slots__ = () def __init__( - self, *args, protocol: str | None = None, **storage_options: Any + self, + *args: JoinablePathLike, + protocol: str | None = None, + **storage_options: Any, ) -> None: super().__init__(*args, protocol=protocol, **storage_options) if not self.drive and len(self.parts) > 1: diff --git a/upath/implementations/data.py b/upath/implementations/data.py index 251a0683..3bc62f74 100644 --- a/upath/implementations/data.py +++ b/upath/implementations/data.py @@ -1,20 +1,23 @@ from __future__ import annotations -import upath.core +from upath.core import UPath -class DataPath(upath.core.UPath): +class DataPath(UPath): @property def parts(self): return (self.path,) def __str__(self): - return self.path + return self.parser.join(*self._raw_urlpaths) def with_segments(self, *pathsegments): raise NotImplementedError("path operation not supported by DataPath") + def with_suffix(self, suffix: str): + raise NotImplementedError("path operation not supported by DataPath") + def mkdir(self, mode=0o777, parents=False, exist_ok=False): raise FileExistsError(str(self)) diff --git a/upath/implementations/http.py b/upath/implementations/http.py index 44275471..9b49cd12 100644 --- a/upath/implementations/http.py +++ b/upath/implementations/http.py @@ -1,14 +1,25 @@ from __future__ import annotations -import os +import sys import warnings +from collections.abc import Iterator +from collections.abc import Sequence from itertools import chain +from typing import TYPE_CHECKING from typing import Any +from urllib.parse import urlsplit from fsspec.asyn import sync from upath._stat import UPathStatResult from upath.core import UPath +from upath.types import JoinablePathLike + +if TYPE_CHECKING: + if sys.version_info >= (3, 11): + from typing import Self + else: + from typing_extensions import Self __all__ = ["HTTPPath"] @@ -18,23 +29,25 @@ class HTTPPath(UPath): @classmethod def _transform_init_args( cls, - args: tuple[str | os.PathLike, ...], + args: tuple[JoinablePathLike, ...], protocol: str, storage_options: dict[str, Any], - ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]: + ) -> tuple[tuple[JoinablePathLike, ...], str, dict[str, Any]]: # allow initialization via a path argument and protocol keyword if args and not str(args[0]).startswith(protocol): args = (f"{protocol}://{str(args[0]).lstrip('/')}", *args[1:]) return args, protocol, storage_options @property - def root(self) -> str: # type: ignore[override] - return super().root or "/" + def parts(self) -> Sequence[str]: + _parts = super().parts + return f"{_parts[0]}/", *_parts[1:] - def __str__(self): - return super(UPath, self).__str__() + def __str__(self) -> str: + sr = urlsplit(super().__str__()) + return sr._replace(path=sr.path or "/").geturl() - def is_file(self): + def is_file(self) -> bool: try: next(super().iterdir()) except (StopIteration, NotADirectoryError): @@ -44,7 +57,7 @@ def is_file(self): else: return False - def is_dir(self): + def is_dir(self) -> bool: try: next(super().iterdir()) except (StopIteration, NotADirectoryError): @@ -54,7 +67,7 @@ def is_dir(self): else: return True - def stat(self, follow_symlinks: bool = True): + def stat(self, follow_symlinks: bool = True) -> UPathStatResult: if not follow_symlinks: warnings.warn( f"{type(self).__name__}.stat(follow_symlinks=False):" @@ -67,33 +80,34 @@ def stat(self, follow_symlinks: bool = True): info["type"] = "directory" if info["url"].endswith("/") else "file" return UPathStatResult.from_info(info) - def iterdir(self): - if self.parts[-1:] == ("",): - yield from self.parent.iterdir() + def iterdir(self) -> Iterator[Self]: + it = iter(super().iterdir()) + try: + item0 = next(it) + except (StopIteration, NotADirectoryError): + raise NotADirectoryError(str(self)) + except FileNotFoundError: + raise FileNotFoundError(str(self)) else: - it = iter(super().iterdir()) - try: - item0 = next(it) - except (StopIteration, NotADirectoryError): - raise NotADirectoryError(str(self)) - except FileNotFoundError: - raise FileNotFoundError(str(self)) - else: - yield from chain([item0], it) + yield from chain([item0], it) def resolve( - self: HTTPPath, + self, strict: bool = False, follow_redirects: bool = True, - ) -> HTTPPath: + ) -> Self: """Normalize the path and resolve redirects.""" - # Normalise the path - resolved_path = super().resolve(strict=strict) - # if the last part is "..", then it's a directory - if self.parts[-1:] == ("..",): - resolved_path = resolved_path.joinpath("") + # special handling of trailing slash behaviour + parts = list(self.parts) + if parts[-1:] == ["."]: + parts[-1:] = [""] + if parts[-2:] == ["", ".."]: + parts[-2:] = [""] + pth = self.with_segments(*parts) + resolved_path = super(HTTPPath, pth).resolve(strict=strict) if follow_redirects: + cls = type(self) # Get the fsspec fs fs = self.fs url = str(self) @@ -108,7 +122,7 @@ def resolve( if method == session.get: raise FileNotFoundError(self) from exc else: - resolved_path = HTTPPath(str(r.url)) + resolved_path = cls(str(r.url)) break return resolved_path diff --git a/upath/implementations/local.py b/upath/implementations/local.py index 223ff988..c2065228 100644 --- a/upath/implementations/local.py +++ b/upath/implementations/local.py @@ -1,27 +1,36 @@ from __future__ import annotations import os +import pathlib import sys -from collections.abc import Collection -from collections.abc import MutableMapping -from inspect import ismemberdescriptor -from pathlib import Path -from pathlib import PosixPath -from pathlib import WindowsPath -from typing import IO +import warnings +from collections.abc import Iterator +from collections.abc import Sequence +from typing import TYPE_CHECKING from typing import Any from urllib.parse import SplitResult +from fsspec import AbstractFileSystem + from upath._protocol import compatible_protocol from upath.core import UPath +from upath.core import _UPathMixin +from upath.types import JoinablePathLike + +if TYPE_CHECKING: + if sys.version_info >= (3, 11): + from typing import Self + else: + from typing_extensions import Self __all__ = [ "LocalPath", - "FilePath", "PosixUPath", "WindowsUPath", + "FilePath", ] + _LISTDIR_WORKS_ON_FILES: bool | None = None @@ -39,210 +48,164 @@ def _check_listdir_works_on_files() -> bool: return w -class LocalPath(UPath): - __slots__ = () - - @property - def path(self): - sep = self._flavour.sep - if self.drive: - return f"/{super().path}".replace(sep, "/") - return super().path.replace(sep, "/") +def _warn_protocol_storage_options( + cls: type, + protocol: str | None, + storage_options: dict[str, Any], +) -> None: + if protocol in {"", None} and not storage_options: + return + warnings.warn( + f"{cls.__name__} on python <= (3, 11) ignores protocol and storage_options", + UserWarning, + stacklevel=3, + ) + + +class LocalPath(_UPathMixin, pathlib.Path): + __slots__ = ( + "_protocol", + "_storage_options", + "_fs_cached", + ) + if TYPE_CHECKING: + _protocol: str + _storage_options: dict[str, Any] + _fs_cached: AbstractFileSystem + + parser = os.path # type: ignore[misc,assignment] @property - def _url(self): - return SplitResult(self.protocol, "", self.path, "", "") - - -class FilePath(LocalPath): - __slots__ = () - - def iterdir(self): - if _LISTDIR_WORKS_ON_FILES is None: - _check_listdir_works_on_files() - if _LISTDIR_WORKS_ON_FILES and self.is_file(): - raise NotADirectoryError(f"{self}") - return super().iterdir() - + def _raw_urlpaths(self) -> Sequence[JoinablePathLike]: + return self.parts -_pathlib_py312_ignore = { - "__slots__", - "__module__", - "__new__", - "__init__", - "_from_parts", - "_from_parsed_parts", - "with_segments", -} + @_raw_urlpaths.setter + def _raw_urlpaths(self, value: Sequence[JoinablePathLike]) -> None: + pass + if sys.version_info >= (3, 12): -def _set_class_attributes( - type_dict: MutableMapping[str, Any], - src: type[Path], - *, - ignore: Collection[str] = frozenset(_pathlib_py312_ignore), -) -> None: - """helper function to assign all methods/attrs from src to a class dict""" - visited = set() - for cls in src.__mro__: - if cls is object: - continue - for attr, func_or_value in cls.__dict__.items(): - if ismemberdescriptor(func_or_value): - continue - if attr in ignore or attr in visited: - continue - else: - visited.add(attr) - - type_dict[attr] = func_or_value - - -def _upath_init(inst: PosixUPath | WindowsUPath) -> None: - """helper to initialize the PosixPath/WindowsPath instance with UPath attrs""" - inst._protocol = "" - inst._storage_options = {} - if sys.version_info < (3, 10) and hasattr(inst, "_init"): - inst._init() - - -class PosixUPath(PosixPath, LocalPath): # type: ignore[misc] - __slots__ = () - - # assign all PosixPath methods/attrs to prevent multi inheritance issues - _set_class_attributes(locals(), src=PosixPath) - - def open( # type: ignore[override] - self, - mode="r", - buffering=-1, - encoding=None, - errors=None, - newline=None, - **fsspec_kwargs, - ) -> IO[Any]: - if fsspec_kwargs: - return super(LocalPath, self).open( - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - **fsspec_kwargs, - ) - else: - return PosixPath.open(self, mode, buffering, encoding, errors, newline) + def __init__( + self, *args, protocol: str | None = None, **storage_options: Any + ) -> None: + super(_UPathMixin, self).__init__(*args) + self._protocol = protocol or "" + self._storage_options = storage_options - if sys.version_info < (3, 12): - - def __new__( - cls, *args, protocol: str | None = None, **storage_options: Any - ) -> PosixUPath: - if os.name == "nt": - raise NotImplementedError( - f"cannot instantiate {cls.__name__} on your system" - ) - if not compatible_protocol("", *args): - raise ValueError("can't combine incompatible UPath protocols") - obj = super().__new__(cls, *args) - obj._protocol = "" - return obj # type: ignore[return-value] + elif sys.version_info >= (3, 10): def __init__( self, *args, protocol: str | None = None, **storage_options: Any ) -> None: - super(Path, self).__init__() - self._drv, self._root, self._parts = type(self)._parse_args(args) - _upath_init(self) - - def _make_child(self, args): - if not compatible_protocol(self._protocol, *args): - raise ValueError("can't combine incompatible UPath protocols") - return super()._make_child(args) + _warn_protocol_storage_options(type(self), protocol, storage_options) + self._drv, self._root, self._parts = self._parse_args(args) # type: ignore[attr-defined] # noqa: E501 + self._protocol = "" + self._storage_options = {} @classmethod - def _from_parts(cls, *args, **kwargs): - obj = super(Path, cls)._from_parts(*args, **kwargs) - _upath_init(obj) + def _from_parts(cls, args): + obj = super()._from_parts(args) + obj._protocol = "" + obj._storage_options = {} return obj @classmethod def _from_parsed_parts(cls, drv, root, parts): - obj = super(Path, cls)._from_parsed_parts(drv, root, parts) - _upath_init(obj) + obj = super()._from_parsed_parts(drv, root, parts) + obj._protocol = "" + obj._storage_options = {} return obj - @property - def path(self) -> str: - return PosixPath.__str__(self) + else: + + def __init__( + self, *args, protocol: str | None = None, **storage_options: Any + ) -> None: + _warn_protocol_storage_options(type(self), protocol, storage_options) + self._drv, self._root, self._parts = self._parse_args(args) # type: ignore[attr-defined] # noqa: E501 + self._init() + + def _init(self, **kwargs: Any) -> None: + super()._init(**kwargs) # type: ignore[misc] + self._protocol = "" + self._storage_options = {} + + def with_segments(self, *pathsegments: str | os.PathLike[str]) -> Self: + return type(self)( + *pathsegments, + protocol=self._protocol, + **self._storage_options, + ) + + @property + def path(self) -> str: + return self.as_posix() + + @property + def _url(self) -> SplitResult: + return SplitResult._make((self.protocol, "", self.path, "", "")) + + def joinpath(self, *other) -> Self: + if not compatible_protocol("", *other): + raise ValueError("can't combine incompatible UPath protocols") + return super().joinpath(*other) + + def __truediv__(self, other) -> Self: + if not compatible_protocol("", other): + raise ValueError("can't combine incompatible UPath protocols") + return super().__truediv__(other) + + def __rtruediv__(self, other) -> Self: + if not compatible_protocol("", other): + raise ValueError("can't combine incompatible UPath protocols") + return super().__rtruediv__(other) + +UPath.register(LocalPath) -class WindowsUPath(WindowsPath, LocalPath): # type: ignore[misc] + +class WindowsUPath(LocalPath, pathlib.WindowsPath): __slots__ = () - # assign all WindowsPath methods/attrs to prevent multi inheritance issues - _set_class_attributes(locals(), src=WindowsPath) - - def open( # type: ignore[override] - self, - mode="r", - buffering=-1, - encoding=None, - errors=None, - newline=None, - **fsspec_kwargs, - ) -> IO[Any]: - if fsspec_kwargs: - return super(LocalPath, self).open( - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - **fsspec_kwargs, + if os.name != "nt": + + def __new__( + cls, *args, protocol: str | None = None, **storage_options: Any + ) -> WindowsUPath: + raise NotImplementedError( + f"cannot instantiate {cls.__name__} on your system" ) - else: - return WindowsPath.open(self, mode, buffering, encoding, errors, newline) - if sys.version_info < (3, 12): + +class PosixUPath(LocalPath, pathlib.PosixPath): + __slots__ = () + + if os.name == "nt": def __new__( cls, *args, protocol: str | None = None, **storage_options: Any - ) -> WindowsUPath: - if os.name != "nt": - raise NotImplementedError( - f"cannot instantiate {cls.__name__} on your system" - ) - if not compatible_protocol("", *args): - raise ValueError("can't combine incompatible UPath protocols") - obj = super().__new__(cls, *args) - obj._protocol = "" - return obj # type: ignore[return-value] + ) -> PosixUPath: + raise NotImplementedError( + f"cannot instantiate {cls.__name__} on your system" + ) - def __init__( - self, *args, protocol: str | None = None, **storage_options: Any - ) -> None: - super(Path, self).__init__() - self._drv, self._root, self._parts = self._parse_args(args) - _upath_init(self) - def _make_child(self, args): - if not compatible_protocol(self._protocol, *args): - raise ValueError("can't combine incompatible UPath protocols") - return super()._make_child(args) +class FilePath(UPath): + __slots__ = () - @classmethod - def _from_parts(cls, *args, **kwargs): - obj = super(Path, cls)._from_parts(*args, **kwargs) - _upath_init(obj) - return obj + def __fspath__(self) -> str: + return self.path - @classmethod - def _from_parsed_parts(cls, drv, root, parts): - obj = super(Path, cls)._from_parsed_parts(drv, root, parts) - _upath_init(obj) - return obj + def iterdir(self) -> Iterator[Self]: + if _LISTDIR_WORKS_ON_FILES is None: + _check_listdir_works_on_files() + elif _LISTDIR_WORKS_ON_FILES and self.is_file(): + raise NotADirectoryError(f"{self}") + return super().iterdir() @property - def path(self) -> str: - return WindowsPath.as_posix(self) + def _url(self) -> SplitResult: + return SplitResult._make((self.protocol, "", self.path, "", "")) + + +LocalPath.register(FilePath) diff --git a/upath/implementations/sftp.py b/upath/implementations/sftp.py index 0c39e3dd..1f1feb8f 100644 --- a/upath/implementations/sftp.py +++ b/upath/implementations/sftp.py @@ -1,7 +1,7 @@ from __future__ import annotations import sys -from collections.abc import Generator +from collections.abc import Iterator from typing import TYPE_CHECKING from typing import Any @@ -19,7 +19,7 @@ class SFTPPath(UPath): __slots__ = () - def iterdir(self) -> Generator[Self]: + def iterdir(self) -> Iterator[Self]: if not self.is_dir(): raise NotADirectoryError(str(self)) else: diff --git a/upath/implementations/smb.py b/upath/implementations/smb.py index 492d738f..055ca2e6 100644 --- a/upath/implementations/smb.py +++ b/upath/implementations/smb.py @@ -1,23 +1,22 @@ from __future__ import annotations -import os import sys import warnings from typing import TYPE_CHECKING from typing import Any +from smbprotocol.exceptions import SMBOSError + +from upath.core import UPath +from upath.types import UNSET_DEFAULT +from upath.types import WritablePathLike + if TYPE_CHECKING: if sys.version_info >= (3, 11): from typing import Self else: from typing_extensions import Self -import smbprotocol.exceptions - -from upath import UPath - -_unset: Any = object() - class SMBPath(UPath): __slots__ = () @@ -31,7 +30,7 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False): self.path, create_parents=parents, ) - except smbprotocol.exceptions.SMBOSError: + except SMBOSError: if not exist_ok: raise FileExistsError(str(self)) if not self.is_dir(): @@ -45,19 +44,19 @@ def iterdir(self): def rename( self, - target: str | os.PathLike[str] | UPath, + target: WritablePathLike, *, - recursive: bool = _unset, - maxdepth: int | None = _unset, + recursive: bool = UNSET_DEFAULT, + maxdepth: int | None = UNSET_DEFAULT, **kwargs: Any, ) -> Self: - if recursive is not _unset: + if recursive is not UNSET_DEFAULT: warnings.warn( "SMBPath.rename(): recursive is currently ignored.", UserWarning, stacklevel=2, ) - if maxdepth is not _unset: + if maxdepth is not UNSET_DEFAULT: warnings.warn( "SMBPath.rename(): maxdepth is currently ignored.", UserWarning, diff --git a/upath/implementations/webdav.py b/upath/implementations/webdav.py index 2c707105..48552651 100644 --- a/upath/implementations/webdav.py +++ b/upath/implementations/webdav.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os from collections.abc import Mapping from typing import Any from urllib.parse import urlsplit @@ -9,6 +8,7 @@ from fsspec.registry import register_implementation from upath.core import UPath +from upath.types import JoinablePathLike __all__ = [ "WebdavPath", @@ -27,10 +27,10 @@ class WebdavPath(UPath): @classmethod def _transform_init_args( cls, - args: tuple[str | os.PathLike, ...], + args: tuple[JoinablePathLike, ...], protocol: str, storage_options: dict[str, Any], - ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]: + ) -> tuple[tuple[JoinablePathLike, ...], str, dict[str, Any]]: if not args: args = ("/",) elif args and protocol in {"webdav+http", "webdav+https"}: @@ -48,7 +48,10 @@ def _transform_init_args( @classmethod def _parse_storage_options( - cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] + cls, + urlpath: str, + protocol: str, + storage_options: Mapping[str, Any], ) -> dict[str, Any]: so = dict(storage_options) if urlpath.startswith(("webdav+http:", "webdav+https:")): @@ -57,12 +60,3 @@ def _parse_storage_options( urlpath = url._replace(scheme="", netloc="").geturl() or "/" so.setdefault("base_url", base) return super()._parse_storage_options(urlpath, "webdav", so) - - @property - def path(self) -> str: - # webdav paths don't start at "/" - return super().path.removeprefix("/") - - def __str__(self): - base_url = self.storage_options["base_url"].removesuffix("/") - return super().__str__().replace("webdav://", f"webdav+{base_url}/", 1) diff --git a/upath/registry.py b/upath/registry.py index 6d129ee6..fa86d4ab 100644 --- a/upath/registry.py +++ b/upath/registry.py @@ -206,11 +206,11 @@ def get_upath_class( if os.name == "nt": from upath.implementations.local import WindowsUPath - return WindowsUPath + return WindowsUPath # type: ignore[return-value] else: from upath.implementations.local import PosixUPath - return PosixUPath + return PosixUPath # type: ignore[return-value] if not fallback: return None try: diff --git a/upath/tests/cases.py b/upath/tests/cases.py index ef1b9f07..4ea493ac 100644 --- a/upath/tests/cases.py +++ b/upath/tests/cases.py @@ -418,8 +418,8 @@ def test_pickling_child_path(self): assert path.storage_options == recovered_path.storage_options def test_child_path(self): - path_str = str(self.path).rstrip("/") - path_a = UPath(f"{path_str}/folder") + path_str = str(self.path) + path_a = UPath(path_str, "folder", **self.path.storage_options) path_b = self.path / "folder" assert str(path_a) == str(path_b) @@ -514,19 +514,6 @@ def test_read_with_fsspec(self): with fs.open(path) as f: assert f.read() == b"hello world" - @pytest.mark.xfail( - sys.version_info >= (3, 13), - reason="no support for private `._drv`, `._root`, `._parts` in 3.13", - ) - def test_access_to_private_api(self): - # DO NOT access these private attributes in your code - p = UPath(str(self.path), **self.path.storage_options) - assert isinstance(p._drv, str) - p = UPath(str(self.path), **self.path.storage_options) - assert isinstance(p._root, str) - p = UPath(str(self.path), **self.path.storage_options) - assert isinstance(p._parts, (list, tuple)) - def test_hashable(self): assert hash(self.path) diff --git a/upath/tests/implementations/test_http.py b/upath/tests/implementations/test_http.py index 126eec5c..cd5b5966 100644 --- a/upath/tests/implementations/test_http.py +++ b/upath/tests/implementations/test_http.py @@ -186,3 +186,65 @@ def test_joinuri_behavior(base, rel, expected): pr = p0.joinuri(rel) pe = UPath(expected) assert pr == pe + + +NORMALIZATIONS = ( + ("unnormalized", "normalized"), + ( + # Expected normalization results according to curl + ("http://example.com", "http://example.com/"), + ("http://example.com/", "http://example.com/"), + ("http://example.com/a", "http://example.com/a"), + ("http://example.com//a", "http://example.com//a"), + ("http://example.com///a", "http://example.com///a"), + ("http://example.com////a", "http://example.com////a"), + ("http://example.com/a/.", "http://example.com/a/"), + ("http://example.com/a/./", "http://example.com/a/"), + ("http://example.com/a/./b", "http://example.com/a/b"), + ("http://example.com/a/.//", "http://example.com/a//"), + ("http://example.com/a/.//b", "http://example.com/a//b"), + ("http://example.com/a//.", "http://example.com/a//"), + ("http://example.com/a//./", "http://example.com/a//"), + ("http://example.com/a//./b", "http://example.com/a//b"), + ("http://example.com/a//.//", "http://example.com/a///"), + ("http://example.com/a//.//b", "http://example.com/a///b"), + ("http://example.com/a/..", "http://example.com/"), + ("http://example.com/a/../", "http://example.com/"), + ("http://example.com/a/../.", "http://example.com/"), + ("http://example.com/a/../..", "http://example.com/"), + ("http://example.com/a/../../", "http://example.com/"), + ("http://example.com/a/../..//", "http://example.com//"), + ("http://example.com/a/..//", "http://example.com//"), + ("http://example.com/a/..//.", "http://example.com//"), + ("http://example.com/a/..//..", "http://example.com/"), + ("http://example.com/a/../b", "http://example.com/b"), + ("http://example.com/a/..//b", "http://example.com//b"), + ("http://example.com/a//..", "http://example.com/a/"), + ("http://example.com/a//../", "http://example.com/a/"), + ("http://example.com/a//../.", "http://example.com/a/"), + ("http://example.com/a//../..", "http://example.com/"), + ("http://example.com/a//../../", "http://example.com/"), + ("http://example.com/a//../..//", "http://example.com//"), + ("http://example.com/a//..//..", "http://example.com/a/"), + ("http://example.com/a//../b", "http://example.com/a/b"), + ("http://example.com/a//..//", "http://example.com/a//"), + ("http://example.com/a//..//.", "http://example.com/a//"), + ("http://example.com/a//..//b", "http://example.com/a//b"), + ), +) + + +@pytest.mark.parametrize(*NORMALIZATIONS) +def test_normalize(unnormalized, normalized): + expected = HTTPPath(normalized) + pth = HTTPPath(unnormalized) + assert expected.protocol in {"http", "https"} + assert pth.protocol in {"http", "https"} + + # Normalise only, do not attempt to follow redirects for http:// paths here + result = pth.resolve(strict=True, follow_redirects=False) + + str_expected = str(expected) + str_result = str(result) + assert expected == result + assert str_expected == str_result diff --git a/upath/tests/implementations/test_s3.py b/upath/tests/implementations/test_s3.py index 1b565316..13580dc7 100644 --- a/upath/tests/implementations/test_s3.py +++ b/upath/tests/implementations/test_s3.py @@ -93,10 +93,6 @@ def test_iterdir_with_plus_in_name(self, s3_with_plus_chr_name): (file,) = files assert file == p.joinpath("file.txt") - @pytest.mark.skip - def test_makedirs_exist_ok_false(self): - pass - @pytest.fixture def s3_with_plus_chr_name(s3_server): diff --git a/upath/tests/test_core.py b/upath/tests/test_core.py index bfbf399a..10dcba40 100644 --- a/upath/tests/test_core.py +++ b/upath/tests/test_core.py @@ -3,7 +3,6 @@ import pickle import sys import warnings -from collections.abc import Mapping from urllib.parse import SplitResult import pytest @@ -11,6 +10,12 @@ from upath import UPath from upath.implementations.cloud import GCSPath from upath.implementations.cloud import S3Path +from upath.types import CompatOpenablePath +from upath.types import CompatReadablePath +from upath.types import CompatWritablePath +from upath.types import OpenablePath +from upath.types import ReadablePath +from upath.types import WritablePath from .cases import BaseTests from .utils import only_on_windows @@ -111,20 +116,41 @@ class MyPath(UPath): def test_subclass_with_gcs(): path = UPath("gcs://bucket", anon=True) assert isinstance(path, UPath) - assert isinstance(path, pathlib.Path) + assert isinstance(path, ReadablePath) + assert isinstance(path, WritablePath) + assert isinstance(path, OpenablePath) + assert isinstance(path, CompatReadablePath) + assert isinstance(path, CompatWritablePath) + assert isinstance(path, CompatOpenablePath) + assert not isinstance(path, os.PathLike) + assert not isinstance(path, pathlib.Path) def test_instance_check(local_testdir): - upath = UPath(local_testdir) + path = UPath(local_testdir) # test instance check passes - assert isinstance(upath, pathlib.Path) - assert isinstance(upath, UPath) + assert isinstance(path, UPath) + assert isinstance(path, ReadablePath) + assert isinstance(path, WritablePath) + assert isinstance(path, OpenablePath) + assert isinstance(path, CompatReadablePath) + assert isinstance(path, CompatWritablePath) + assert isinstance(path, CompatOpenablePath) + assert isinstance(path, os.PathLike) + assert isinstance(path, pathlib.Path) def test_instance_check_local_uri(local_testdir): - upath = UPath(f"file://{local_testdir}") - assert isinstance(upath, pathlib.Path) - assert isinstance(upath, UPath) + path = UPath(f"file://{local_testdir}") + assert isinstance(path, UPath) + assert isinstance(path, ReadablePath) + assert isinstance(path, WritablePath) + assert isinstance(path, OpenablePath) + assert isinstance(path, CompatReadablePath) + assert isinstance(path, CompatWritablePath) + assert isinstance(path, CompatOpenablePath) + assert isinstance(path, os.PathLike) + assert not isinstance(path, pathlib.Path) @pytest.mark.xfail(reason="unsupported on universal_pathlib>0.1.4") @@ -286,21 +312,18 @@ def __fspath__(self): ], ) def test_access_to_private_kwargs_and_url(urlpath): + p0 = UPath(urlpath) + assert not hasattr(p0, "_kwargs") + # fixme: this should be deprecated... - pth = UPath(urlpath) - with pytest.warns(DeprecationWarning, match="UPath._kwargs is deprecated"): - assert isinstance(pth._kwargs, Mapping) - with pytest.warns(DeprecationWarning, match="UPath._kwargs is deprecated"): - assert pth._kwargs == {} - assert isinstance(pth._url, SplitResult) - assert pth._url.scheme == "" or pth._url.scheme in pth.fs.protocol - assert pth._url.path == pth.path - subpth = pth / "foo" - with pytest.warns(DeprecationWarning, match="UPath._kwargs is deprecated"): - assert subpth._kwargs == {} - assert isinstance(subpth._url, SplitResult) - assert subpth._url.scheme == "" or subpth._url.scheme in subpth.fs.protocol - assert subpth._url.path == subpth.path + assert isinstance(p0._url, SplitResult) + assert p0._url.scheme == "" or p0._url.scheme in p0.fs.protocol + assert p0._url.path == p0.path + + p1 = p0 / "foo" + assert isinstance(p1._url, SplitResult) + assert p1._url.scheme == "" or p1._url.scheme in p1.fs.protocol + assert p1._url.path == p1.path def test_copy_path_append_kwargs(): @@ -336,45 +359,6 @@ def test_uri_parsing(): NORMALIZATIONS = ( ("unnormalized", "normalized"), ( - # Expected normalization results according to curl - ("http://example.com", "http://example.com/"), - ("http://example.com/", "http://example.com/"), - ("http://example.com/a", "http://example.com/a"), - ("http://example.com//a", "http://example.com//a"), - ("http://example.com///a", "http://example.com///a"), - ("http://example.com////a", "http://example.com////a"), - ("http://example.com/a/.", "http://example.com/a/"), - ("http://example.com/a/./", "http://example.com/a/"), - ("http://example.com/a/./b", "http://example.com/a/b"), - ("http://example.com/a/.//", "http://example.com/a//"), - ("http://example.com/a/.//b", "http://example.com/a//b"), - ("http://example.com/a//.", "http://example.com/a//"), - ("http://example.com/a//./", "http://example.com/a//"), - ("http://example.com/a//./b", "http://example.com/a//b"), - ("http://example.com/a//.//", "http://example.com/a///"), - ("http://example.com/a//.//b", "http://example.com/a///b"), - ("http://example.com/a/..", "http://example.com/"), - ("http://example.com/a/../", "http://example.com/"), - ("http://example.com/a/../.", "http://example.com/"), - ("http://example.com/a/../..", "http://example.com/"), - ("http://example.com/a/../../", "http://example.com/"), - ("http://example.com/a/../..//", "http://example.com//"), - ("http://example.com/a/..//", "http://example.com//"), - ("http://example.com/a/..//.", "http://example.com//"), - ("http://example.com/a/..//..", "http://example.com/"), - ("http://example.com/a/../b", "http://example.com/b"), - ("http://example.com/a/..//b", "http://example.com//b"), - ("http://example.com/a//..", "http://example.com/a/"), - ("http://example.com/a//../", "http://example.com/a/"), - ("http://example.com/a//../.", "http://example.com/a/"), - ("http://example.com/a//../..", "http://example.com/"), - ("http://example.com/a//../../", "http://example.com/"), - ("http://example.com/a//../..//", "http://example.com//"), - ("http://example.com/a//..//..", "http://example.com/a/"), - ("http://example.com/a//../b", "http://example.com/a/b"), - ("http://example.com/a//..//", "http://example.com/a//"), - ("http://example.com/a//..//.", "http://example.com/a//"), - ("http://example.com/a//..//b", "http://example.com/a//b"), # Normalization with and without an authority component ("memory:/a/b/..", "memory://a/"), ("memory:/a/b/.", "memory://a/b/"), @@ -396,11 +380,7 @@ def test_uri_parsing(): def test_normalize(unnormalized, normalized): expected = UPath(normalized) pth = UPath(unnormalized) - if pth.protocol in {"http", "https"}: - # Normalise only, do not attempt to follow redirects for http:// paths here - result = pth.resolve(strict=True, follow_redirects=False) - else: - result = pth.resolve(strict=True) + result = pth.resolve(strict=True) str_expected = str(expected) str_result = str(result) assert expected == result @@ -430,13 +410,13 @@ def test_query_string(uri, query_str): @pytest.mark.parametrize("base,join", PROTOCOL_MISMATCH) def test_joinpath_on_protocol_mismatch(base, join): - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="can't combine incompatible UPath protocols"): UPath(base).joinpath(UPath(join)) @pytest.mark.parametrize("base,join", PROTOCOL_MISMATCH) def test_truediv_on_protocol_mismatch(base, join): - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="can't combine incompatible UPath protocols"): UPath(base) / UPath(join) diff --git a/upath/types/__init__.py b/upath/types/__init__.py index 1492e9e5..7bd42803 100644 --- a/upath/types/__init__.py +++ b/upath/types/__init__.py @@ -1,24 +1,22 @@ from __future__ import annotations +import enum +import os import pathlib import sys from collections.abc import Iterator from collections.abc import Sequence from typing import IO +from typing import TYPE_CHECKING from typing import Any from typing import BinaryIO -from typing import Callable from typing import Literal from typing import Protocol from typing import TextIO +from typing import Union from typing import overload from typing import runtime_checkable -if sys.version_info > (3, 11): - from typing import Self -else: - from typing_extensions import Self - from pathlib_abc import magic_open from upath.types._abc import JoinablePath @@ -27,11 +25,25 @@ from upath.types._abc import ReadablePath from upath.types._abc import WritablePath +if TYPE_CHECKING: + if sys.version_info > (3, 11): + from typing import Self + else: + from typing_extensions import Self + + if sys.version_info >= (3, 12): + from typing import TypeAlias + else: + TypeAlias = Any + __all__ = [ "JoinablePath", "ReadablePath", "WritablePath", "OpenablePath", + "JoinablePathLike", + "ReadablePathLike", + "WritablePathLike", "CompatJoinablePath", "CompatReadablePath", "CompatWritablePath", @@ -40,8 +52,20 @@ "StatResultType", "PathParser", "UPathParser", + "UNSET_DEFAULT", ] +JoinablePathLike: TypeAlias = Union[str, JoinablePath] +ReadablePathLike: TypeAlias = Union[str, ReadablePath] +WritablePathLike: TypeAlias = Union[str, WritablePath] + + +class _DefaultValue(enum.Enum): + UNSET = enum.auto() + + +UNSET_DEFAULT: Any = _DefaultValue.UNSET + class OpenablePath(ReadablePath, WritablePath): """Helper class to annotate read/writable paths which have an .open() method.""" @@ -129,9 +153,9 @@ class CompatReadablePath(CompatJoinablePath, Protocol): # not available in Python 3.9.* pathlib: # - `__open_rb__` # - `info` - # - `readlink` # - `copy` # - `copy_into` + # - `walk` __slots__ = () def read_bytes(self) -> bytes: ... @@ -147,13 +171,6 @@ def iterdir(self) -> Iterator[Self]: ... def glob(self, pattern: str, *, recurse_symlinks: bool = ...) -> Iterator[Self]: ... - def walk( - self, - top_down: bool = ..., - on_error: Callable[[Exception], Any] | None = ..., - follow_symlinks: bool = ..., - ) -> Iterator[Self]: ... - def readlink(self) -> Self: ... @@ -256,3 +273,19 @@ class UPathParser(PathParser, Protocol): """duck-type for upath.core.UPathParser""" def strip_protocol(self, path: JoinablePath | str) -> str: ... + + def join( + self, + path: JoinablePath | os.PathLike[str] | str, + *paths: JoinablePath | os.PathLike[str] | str, + ) -> str: ... + + def isabs(self, path: JoinablePath | os.PathLike[str] | str) -> bool: ... + + def splitdrive( + self, path: JoinablePath | os.PathLike[str] | str + ) -> tuple[str, str]: ... + + def splitroot( + self, path: JoinablePath | os.PathLike[str] | str + ) -> tuple[str, str, str]: ... diff --git a/upath/types/_abc.pyi b/upath/types/_abc.pyi index 3776cb61..2788fb4c 100644 --- a/upath/types/_abc.pyi +++ b/upath/types/_abc.pyi @@ -27,6 +27,7 @@ class JoinablePath(ABC): def with_segments(self, *pathsegments: str | Self) -> Self: ... @abstractmethod def __str__(self) -> str: ... + @property def anchor(self) -> str: ... @property def name(self) -> str: ... @@ -76,7 +77,7 @@ class ReadablePath(JoinablePath): top_down: bool = ..., on_error: OnErrorCallable | None = ..., follow_symlinks: bool = ..., - ): ... + ) -> Iterator[tuple[Self, list[str], list[str]]]: ... @abstractmethod def readlink(self) -> Self: ... def copy(self, target: T, **kwargs: Any) -> T: ... @@ -86,7 +87,9 @@ class WritablePath(JoinablePath): __slots__ = () @abstractmethod - def symlink_to(self, target: WritablePath, target_is_directory: bool = ...): ... + def symlink_to( + self, target: ReadablePath, target_is_directory: bool = ... + ) -> None: ... @abstractmethod def mkdir(self) -> None: ... @abstractmethod @@ -94,7 +97,7 @@ class WritablePath(JoinablePath): def write_bytes(self, data: bytes) -> int: ... def write_text( self, - data: bytes, + data: str, encoding: str | None = ..., errors: str | None = ..., newline: str | None = ...,