diff --git a/.gitignore b/.gitignore index 05e554a64..3f602651b 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ _build/ build/ dist/ htmlcov/ +tests/downloads diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 49ae0d4e7..97e3959bd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: - id: trailing-whitespace - repo: https://github.com/pre-commit/mirrors-mypy - rev: v0.812 + rev: v0.931 hooks: - id: mypy exclude: '^(docs|tasks|tests)|setup\.py' diff --git a/packaging/metadata.py b/packaging/metadata.py new file mode 100644 index 000000000..1375df520 --- /dev/null +++ b/packaging/metadata.py @@ -0,0 +1,385 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +import dataclasses +import math +from email import message_from_bytes +from email.headerregistry import Address, AddressHeader +from email.message import EmailMessage +from email.policy import EmailPolicy, Policy +from functools import reduce +from inspect import cleandoc +from itertools import chain +from typing import ( + TYPE_CHECKING, + Any, + ClassVar, + Collection, + Dict, + Iterable, + Iterator, + Mapping, + Set, + Tuple, + Type, + TypeVar, + Union, + cast, +) + +from .requirements import InvalidRequirement, Requirement +from .specifiers import SpecifierSet +from .version import Version + +T = TypeVar("T", bound="CoreMetadata") +A = TypeVar("A") +B = TypeVar("B") + +if TYPE_CHECKING: # pragma: no cover + from typing_extensions import Literal + + NormalizedDynamicFields = Literal[ + "platform", + "summary", + "description", + "keywords", + "home-page", + "author", + "author-email", + "license", + "supported-platform", + "download-url", + "classifier", + "maintainer", + "maintainer-email", + "requires-dist", + "requires-python", + "requires-external", + "project-url", + "provides-extra", + "provides-dist", + "obsoletes-dist", + "description-content-type", + ] +else: + NormalizedDynamicFields = str + + +def _normalize_field_name_for_dynamic(field: str) -> NormalizedDynamicFields: + """Normalize a metadata field name that is acceptable in `dynamic`. + + The field name will be normalized to lower-case. JSON field names are + also acceptable and will be translated accordingly. + + """ + return cast(NormalizedDynamicFields, field.lower().replace("_", "-")) + + +def _field_name(field: str) -> str: + """Equivalent field name in the class representing metadata""" + return field.lower().replace("-", "_") + + +# In the following, comparison is disabled because currently `Requirement` +# objects are unhashable/not-comparable. + + +@dataclasses.dataclass(eq=False) +class CoreMetadata: + """ + Core metadata for Python packages, represented as an immutable + :obj:`dataclass `. + + Specification: https://packaging.python.org/en/latest/specifications/core-metadata/ + + Attribute names follow :pep:`PEP 566's JSON guidelines + <566#json-compatible-metadata>`. + """ + + # 1.0 + name: str + version: Union[Version, None] = None + platform: Collection[str] = () + summary: str = "" + description: str = "" + keywords: Collection[str] = () + home_page: str = "" + author: str = "" + author_email: Collection[Tuple[Union[str, None], str]] = () + license: str = "" + # license_file: Collection[str] = () # not standard yet + # 1.1 + supported_platform: Collection[str] = () + download_url: str = "" + classifier: Collection[str] = () + # 1.2 + maintainer: str = "" + maintainer_email: Collection[Tuple[Union[str, None], str]] = () + requires_dist: Collection[Requirement] = () + requires_python: SpecifierSet = dataclasses.field(default_factory=SpecifierSet) + requires_external: Collection[str] = () + project_url: Mapping[str, str] = dataclasses.field(default_factory=dict) + provides_extra: Collection[str] = () + provides_dist: Collection[Requirement] = () + obsoletes_dist: Collection[Requirement] = () + # 2.1 + description_content_type: str = "" + # 2.2 + dynamic: Collection[NormalizedDynamicFields] = () + + @property + def metadata_version(self) -> str: + """ + The data structure is always compatible with the latest approved + version of the spec, even when parsing files that use previous versions. + """ + return "2.2" + + @classmethod + def _fields(cls) -> Collection[str]: + return [f.name for f in dataclasses.fields(cls)] + + @classmethod + def _process_attrs( + cls, attrs: Iterable[Tuple[str, Any]] + ) -> Iterable[Tuple[str, Any]]: + """Transform input data to the matching attribute types.""" + + _as_set = (cls._MULTIPLE_USE | {"keywords"}) - {"project_url"} + _available_fields = cls._fields() + + for field, value in attrs: + if field == "version": + yield ("version", Version(value)) + elif field == "keywords": + yield (field, set(value.split(","))) + elif field == "requires_python": + yield (field, cls._parse_requires_python(value)) + elif field == "project_url": + yield (field, cls._parse_url(value)) + elif field == "dynamic": + values = (_normalize_field_name_for_dynamic(f) for f in value) + yield (field, set(values)) + elif field.endswith("email"): + yield (field, set(cls._parse_emails(value.strip()))) + elif field.endswith("dist"): + yield (field, {cls._parse_req(v) for v in value}) + elif field in _as_set: + yield (field, set(value)) + elif field in _available_fields: + yield (field, value) + + @classmethod + def _parse_pkg_info(cls, pkg_info: bytes) -> Iterable[Tuple[str, Any]]: + """Parse PKG-INFO data.""" + + msg = message_from_bytes(pkg_info, EmailMessage, policy=cls._PARSING_POLICY) + info = cast(EmailMessage, msg) + has_description = False + + for key in info.keys(): + field = _field_name(key) + if field in cls._UPDATES: + field = cls._UPDATES[field] + + value = str(info.get(key)) # email.header.Header.__str__ handles encoding + + if field in {"keywords", "summary"} or field.endswith("email"): + yield (field, cls._ensure_single_line(value)) + elif field == "description": + has_description = True + yield (field, cls._unescape_description(value)) + elif field in cls._MULTIPLE_USE: + yield (field, (str(v) for v in info.get_all(key))) + else: + yield (field, value) + + if not has_description: + yield ("description", str(info.get_payload(decode=True), "utf-8")) + + @classmethod + def from_pkg_info( + cls: Type[T], pkg_info: bytes, *, allow_unfilled_dynamic: bool = True + ) -> T: + """Parse PKG-INFO data.""" + + attrs = cls._process_attrs(cls._parse_pkg_info(pkg_info)) + obj = cls(**dict(attrs)) + obj._validate(allow_unfilled_dynamic) + + return obj + + def to_pkg_info(self, *, allow_unfilled_dynamic: bool = True) -> bytes: + """Generate PKG-INFO data.""" + + self._validate(allow_unfilled_dynamic) + + info = EmailMessage(self._PARSING_POLICY) + info.add_header("Metadata-Version", self.metadata_version) + # Use `sorted` in collections to improve reproducibility + for field in self._fields(): + value = getattr(self, field) + if not value: + continue + key = self._canonical_field(field) + if field in "keywords": + info.add_header(key, ",".join(sorted(value))) + elif field.endswith("email"): + _emails = (self._serialize_email(v) for v in value if any(v)) + emails = ", ".join(sorted(v for v in _emails if v)) + if emails: + info.add_header(key, emails) + elif field == "project_url": + for kind in sorted(value): + info.add_header(key, f"{kind}, {value[kind]}") + elif field == "description": + info.set_payload(bytes(value, "utf-8")) + elif field in self._MULTIPLE_USE: + for single_value in sorted(str(v) for v in value): + info.add_header(key, single_value) + else: + info.add_header(key, str(value)) + + return info.as_bytes() + + # --- Auxiliary Methods and Properties --- + # Not part of the API, but can be overwritten by subclasses + # (useful when providing a prof-of-concept for new PEPs) + + _MANDATORY: ClassVar[Set[str]] = {"name", "version"} + _NOT_DYNAMIC: ClassVar[Set[str]] = {"metadata_version", "dynamic"} | _MANDATORY + _MULTIPLE_USE: ClassVar[Set[str]] = { + "dynamic", + "platform", + "supported_platform", + "classifier", + "requires_dist", + "requires_external", + "project_url", + "provides_extra", + "provides_dist", + "obsoletes_dist", + } + _UPDATES: ClassVar[Dict[str, str]] = { + "requires": "requires_dist", # PEP 314 => PEP 345 + "provides": "provides_dist", # PEP 314 => PEP 345 + "obsoletes": "obsoletes_dist", # PEP 314 => PEP 345 + } + _PARSING_POLICY: ClassVar[Policy] = EmailPolicy(max_line_length=math.inf, utf8=True) + + @classmethod + def _canonical_field(cls, field: str) -> str: + words = _normalize_field_name_for_dynamic(field).split("-") + ucfirst = "-".join(w[0].upper() + w[1:] for w in words) + replacements = {"Url": "URL", "Email": "email", "Page": "page"}.items() + return reduce(lambda acc, x: acc.replace(x[0], x[1]), replacements, ucfirst) + + @classmethod + def _ensure_single_line(cls, value: str) -> str: + """Existing distributions might include metadata with fields such as 'keywords' + or 'summary' showing up as multiline strings. + """ + return " ".join(value.splitlines()) + + @classmethod + def _parse_requires_python(cls, value: str) -> SpecifierSet: + if value and value[0].isnumeric(): + value = f"=={value}" + return SpecifierSet(value) + + @classmethod + def _parse_req(cls, value: str) -> Requirement: + try: + return Requirement(value) + except InvalidRequirement: + # Some old examples in PEPs use "()" around versions without an operator + # e.g.: `Provides: xmltools (1.3)` + name, _, rest = value.strip().partition("(") + value = f"{name}(=={rest}" + return Requirement(value) + + @classmethod + def _parse_url(cls, value: Iterable[str]) -> Dict[str, str]: + urls = {} + for url in value: + key, _, value = url.partition(",") + urls[key.strip()] = value.strip() + return urls + + @classmethod + def _parse_emails(cls, value: str) -> Iterator[Tuple[Union[str, None], str]]: + if value == "UNKNOWN": + return + address_list = AddressHeader.value_parser(value) + for mailbox in address_list.all_mailboxes: + yield (mailbox.display_name, mailbox.addr_spec) + + @classmethod + def _serialize_email(cls, value: Tuple[Union[str, None], str]) -> str: + return str(Address(value[0] or "", addr_spec=value[1])) + + @classmethod + def _unescape_description(cls, content: str) -> str: + """Reverse RFC-822 escaping by removing leading whitespaces from content.""" + lines = cleandoc(content).splitlines() + if not lines: + return "" + + continuation = (line.lstrip("|") for line in lines[1:]) + return "\n".join(chain(lines[:1], continuation)) + + def _validate(self, allow_unfilled_dynamic: bool) -> bool: + self._validate_required_fields() + self._validate_dynamic() + if not allow_unfilled_dynamic: + self._validate_unfilled_dynamic() + + return True + + def _validate_dynamic(self) -> bool: + for item in self.dynamic: + field = _field_name(item) + if not hasattr(self, field): + raise InvalidCoreMetadataField(item) + if field in self._NOT_DYNAMIC: + raise InvalidDynamicField(item) + + return True + + def _validate_unfilled_dynamic(self) -> bool: + unresolved = [k for k in self.dynamic if not getattr(self, _field_name(k))] + if unresolved: + raise UnfilledDynamicFields(unresolved) + + return True + + def _validate_required_fields(self) -> bool: + missing_fields = [k for k in self._MANDATORY if not getattr(self, k)] + if missing_fields: + raise MissingRequiredFields(missing_fields) + + return True + + +class InvalidCoreMetadataField(ValueError): + def __init__(self, field: str): + super().__init__(f"{field!r} is not a valid core metadata field") + + +class InvalidDynamicField(ValueError): + def __init__(self, field: str): + super().__init__(f"{field!r} cannot be dynamic") + + +class UnfilledDynamicFields(ValueError): + def __init__(self, fields: Iterable[str]): + given = ", ".join(repr(f) for f in fields) + msg = f"Unfilled dynamic fields not allowed in this context (given: {given})" + super().__init__(msg) + + +class MissingRequiredFields(ValueError): + def __init__(self, fields: Iterable[str]): + missing = ", ".join(fields) + super().__init__(f"Required fields are missing: {missing}") diff --git a/tests/metadata_examples.csv b/tests/metadata_examples.csv new file mode 100644 index 000000000..b01eea9d2 --- /dev/null +++ b/tests/metadata_examples.csv @@ -0,0 +1,18 @@ +appdirs,1.3.0 +boto3,1.20.37 +click,0.3 +enscons,0.28.0 +Flask,2.0.2 +flit,3.6.0 +hatch,0.23.1 +pdm,1.12.6 +poetry,1.1.12 +ptpython,0.5 +PyScaffold,4.1.4 +requests,2.27.1 +scikit-build,0.12.0 +setuptools,60.5.0 +six,1.16.0 +tomli,2.0.0 +urllib3,1.26.8 +wheelfile,0.0.8 diff --git a/tests/test_metadata.py b/tests/test_metadata.py new file mode 100644 index 000000000..ca0212e12 --- /dev/null +++ b/tests/test_metadata.py @@ -0,0 +1,413 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +import dataclasses +import json +import tarfile +from email.policy import compat32 +from functools import partial +from hashlib import md5 +from itertools import chain +from pathlib import Path +from textwrap import dedent +from typing import Iterator, List +from urllib.request import urlopen +from zipfile import ZipFile + +import pytest + +from packaging.metadata import ( + CoreMetadata, + InvalidCoreMetadataField, + InvalidDynamicField, + MissingRequiredFields, + UnfilledDynamicFields, +) +from packaging.requirements import Requirement +from packaging.utils import canonicalize_name +from packaging.version import Version + +HERE = Path(__file__).parent +EXAMPLES = HERE / "metadata_examples.csv" +DOWNLOADS = HERE / "downloads" + + +class TestCoreMetadata: + def test_simple(self): + example = { + "name": "simple", + "version": Version("0.1"), + "requires_dist": [Requirement("appdirs>1.2")], + } + metadata = CoreMetadata(**example) + assert isinstance(metadata.to_pkg_info(), bytes) + + def test_invalid(self): + example = { + "name": "simple", + "author_email": [(None, "me@example.com")], + "requires_dist": [Requirement("appdirs>1.2")], + } + metadata = CoreMetadata(**example) + with pytest.raises(MissingRequiredFields): # version is missing + metadata.to_pkg_info() + + metadata = dataclasses.replace(metadata, version=Version("0.42")) + with pytest.raises(InvalidCoreMetadataField): + dataclasses.replace(metadata, dynamic=["myfield"]).to_pkg_info() + with pytest.raises(InvalidDynamicField): + dataclasses.replace(metadata, dynamic=["name"]).to_pkg_info() + + PER_VERSION_EXAMPLES = { + "1.1": { + "has_dynamic_fields": False, + "is_final_metadata": True, + "file_contents": """\ + Metadata-Version: 1.1 + Name: BeagleVote + Version: 1.0a2 + Platform: ObscureUnix, RareDOS + Supported-Platform: RedHat 7.2 + Supported-Platform: i386-win32-2791 + Summary: A module for collecting votes from beagles. + Description: This module collects votes from beagles + in order to determine their electoral wishes. + Do *not* try to use this module with basset hounds; + it makes them grumpy. + Keywords: dog puppy voting election + Home-page: http://www.example.com/~cschultz/bvote/ + Author: C. Schultz, Universal Features Syndicate, + Los Angeles, CA + Author-email: "C. Schultz" + License: This software may only be obtained by sending the + author a postcard, and then the user promises not + to redistribute it. + Classifier: Development Status :: 4 - Beta + Classifier: Environment :: Console (Text Based) + Requires: re + Requires: sys + Requires: zlib + Requires: xml.parsers.expat (>1.0) + Requires: psycopg + Provides: xml + Provides: xml.utils + Provides: xml.utils.iso8601 + Provides: xml.dom + Provides: xmltools (1.3) + Obsoletes: Gorgon + """, # based on PEP 314 + }, + "2.1": { + "has_dynamic_fields": False, + "is_final_metadata": True, + "file_contents": """\ + Metadata-Version: 2.1 + Name: BeagleVote + Version: 1.0a2 + Platform: ObscureUnix, RareDOS + Supported-Platform: RedHat 7.2 + Supported-Platform: i386-win32-2791 + Summary: A module for collecting votes from beagles. + Description: This project provides powerful math functions + |For example, you can use `sum()` to sum numbers: + | + |Example:: + | + | >>> sum(1, 2) + | 3 + | + Keywords: dog puppy voting election + Home-page: http://www.example.com/~cschultz/bvote/ + Author: C. Schultz, Universal Features Syndicate, + Los Angeles, CA + Author-email: "C. Schultz" + Maintainer: C. Schultz, Universal Features Syndicate, + Los Angeles, CA + Maintainer-email: "C. Schultz" + License: This software may only be obtained by sending the + author a postcard, and then the user promises not + to redistribute it. + Classifier: Development Status :: 4 - Beta + Classifier: Environment :: Console (Text Based) + Requires-Dist: pkginfo + Requires-Dist: PasteDeploy + Requires-Dist: zope.interface (>3.5.0) + Provides-Dist: OtherProject + Provides-Dist: AnotherProject (3.4) + Provides-Dist: virtual_package + Obsoletes-Dist: Gorgon + Obsoletes-Dist: OtherProject (<3.0) + Requires-Python: 2.5 + Requires-Python: >2.1 + Requires-Python: >=2.3.4 + Requires-Python: >=2.5,<2.7 + Requires-External: C + Requires-External: libpng (>=1.5) + Project-URL: Bug Tracker, https://github.com/pypa/setuptools/issues + Project-URL: Documentation, https://setuptools.readthedocs.io/ + Project-URL: Funding, https://donate.pypi.org + Requires-Dist: pywin32 (>1.0); sys.platform == 'win32' + Obsoletes-Dist: pywin31; sys.platform == 'win32' + Requires-Dist: foo (1,!=1.3); platform.machine == 'i386' + Requires-Dist: bar; python_version == '2.4' or python_version == '2.5' + Requires-Dist: baz (>=1,!=1.3); platform.machine == 'i386' + Requires-External: libxslt; 'linux' in sys.platform + Provides-Extra: docs + Description-Content-Type: text/x-rst; charset=UTF-8 + """, # based on PEP 345 / PEP 566 + }, + "2022-01-16": { + "has_dynamic_fields": True, + "is_final_metadata": False, + "file_contents": """\ + Metadata-Version: 2.2 + Name: BeagleVote + Version: 1.0a2 + Platform: ObscureUnix + Platform: RareDOS + Supported-Platform: RedHat 7.2 + Supported-Platform: i386-win32-2791 + Keywords: dog,puppy,voting,election + Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM + Author-email: cschuoltz@example.com, snoopy@peanuts.com + License: GPL version 3, excluding DRM provisions + Requires-Dist: pkginfo + Requires-Dist: PasteDeploy + Requires-Dist: zope.interface (>3.5.0) + Requires-Dist: pywin32 >1.0; sys_platform == 'win32' + Requires-Python: >2.6,!=3.0.*,!=3.1.* + Requires-External: C + Requires-External: libpng (>=1.5) + Requires-External: make; sys_platform != "win32" + Project-URL: Bug Tracker, http://bitbucket.org/tarek/distribute/issues/ + Provides-Extra: pdf + Requires-Dist: reportlab; extra == 'pdf' + Provides-Dist: OtherProject + Provides-Dist: AnotherProject (3.4) + Provides-Dist: virtual_package; python_version >= "3.4" + Obsoletes-Dist: Foo; os_name == "posix" + Dynamic: Maintainer + Dynamic: Maintainer-email + + This project provides powerful math functions + For example, you can use `sum()` to sum numbers: + + Example:: + + >>> sum(1, 2) + 3 + + """, # https://packaging.python.org/en/latest/specifications/core-metadata + }, + } + + @pytest.mark.parametrize("spec", PER_VERSION_EXAMPLES.keys()) + def test_parsing(self, spec: str) -> None: + example = self.PER_VERSION_EXAMPLES[spec] + text = bytes(dedent(example["file_contents"]), "UTF-8") + pkg_info = CoreMetadata.from_pkg_info(text) + if example["is_final_metadata"]: + metadata = CoreMetadata.from_pkg_info(text, allow_unfilled_dynamic=False) + assert_equal_metadata(metadata, pkg_info) + if example["has_dynamic_fields"]: + with pytest.raises(UnfilledDynamicFields): + CoreMetadata.from_pkg_info(text, allow_unfilled_dynamic=False) + for field in ("requires_dist", "provides_dist", "obsoletes_dist"): + for value in getattr(pkg_info, field): + assert isinstance(value, Requirement) + desc = pkg_info.description.splitlines() + for line in desc: + assert not line.strip().startswith("|") + + @pytest.mark.parametrize("spec", PER_VERSION_EXAMPLES.keys()) + def test_serliazing(self, spec: str) -> None: + example = self.PER_VERSION_EXAMPLES[spec] + text = bytes(dedent(example["file_contents"]), "UTF-8") + pkg_info = CoreMetadata.from_pkg_info(text) + if example["is_final_metadata"]: + assert isinstance(pkg_info.to_pkg_info(allow_unfilled_dynamic=False), bytes) + if example["has_dynamic_fields"]: + with pytest.raises(UnfilledDynamicFields): + pkg_info.to_pkg_info(allow_unfilled_dynamic=False) + pkg_info_text = pkg_info.to_pkg_info() + assert isinstance(pkg_info_text, bytes) + # Make sure generated document is not empty + assert len(pkg_info_text.strip()) > 0 + assert b"Name" in pkg_info_text + assert b"Metadata-Version" in pkg_info_text + # Make sure email-specific headers don't leak into the generated document + assert b"Content-Transfer-Encoding" not in pkg_info_text + assert b"MIME-Version" not in pkg_info_text + + def test_empty_fields(self): + metadata = CoreMetadata.from_pkg_info(b"Name: pkg\nVersion: 1\nDescription:\n") + assert metadata.description == "" + metadata = CoreMetadata.from_pkg_info(b"Name: pkg\nVersion: 1\nAuthor-email:\n") + assert metadata.description == "" + assert len(metadata.author_email) == 0 + + def test_single_line_description(self): + serialized = b"Name: pkg\nVersion: 1\nDescription: Hello World" + metadata = CoreMetadata.from_pkg_info(serialized) + assert metadata.description == "Hello World" + + def test_empty_email(self): + example = { + "name": "pkg", + "version": Version("1"), + "maintainer_email": [("", "")], + } + metadata = CoreMetadata(**example) + serialized = metadata.to_pkg_info() + assert b"Maintainer-email:" not in serialized + + +# --- Integration Tests --- + + +def examples() -> List[List[str]]: + lines = EXAMPLES.read_text().splitlines() + return [[v.strip() for v in line.split(",")] for line in lines] + + +class TestIntegration: + @pytest.mark.parametrize("pkg, version", examples()) + def test_parse(self, pkg: str, version: str) -> None: + for dist in download_dists(pkg, version): + from_ = CoreMetadata.from_pkg_info + to_ = CoreMetadata.to_pkg_info + if dist.suffix == ".whl": + orig = read_metadata(dist) + from_ = partial(from_, allow_unfilled_dynamic=False) + to_ = partial(to_, allow_unfilled_dynamic=False) + else: + orig = read_pkg_info(dist) + + # Given PKG-INFO or METADATA from existing packages on PyPI + # - Make sure they can be parsed + metadata = from_(orig) + assert metadata.name.lower() == pkg.lower() + assert str(metadata.version) == version + # - Make sure they can be converted back into PKG-INFO or METADATA + recons_file = to_(metadata) + assert len(recons_file) >= 0 + # - Make sure that the reconstructed file can be parsed and the data + # remains unchanged + recons_data = from_(recons_file) + description = metadata.description.replace("\r\n", "\n") + metadata = dataclasses.replace(metadata, description=description) + assert_equal_metadata(metadata, recons_data) + # - Make sure the reconstructed file can be parsed with compat32 + attrs = dataclasses.asdict(_Compat32Metadata.from_pkg_info(recons_file)) + assert CoreMetadata(**attrs) + # - Make sure that successive calls to `to_...` and `from_...` + # always return the same result + file_contents = recons_file + data = recons_data + for _ in range(3): + result_contents = to_(data) + assert file_contents == result_contents + result_data = from_(result_contents) + assert_equal_metadata(data, result_data) + file_contents, data = result_contents, result_data + + +# --- Helper Functions/Classes --- + + +def assert_equal_metadata(metadata1: CoreMetadata, metadata2: CoreMetadata): + fields = (f.name for f in dataclasses.fields(CoreMetadata)) + for field in fields: + value1, value2 = getattr(metadata1, field), getattr(metadata2, field) + if field.endswith("dist"): + # Currently `Requirement` objects are not directly comparable, + # therefore sets containing those objects are also not comparable. + # The best approach is to convert requirements to strings first. + req1, req2 = set(map(str, value1)), set(map(str, value2)) + assert req1 == req2 + elif not value1: + assert not value2 + else: + assert value1 == value2 + + +class _Compat32Metadata(CoreMetadata): + """The Core Metadata spec requires the file to be parse-able with compat32. + The implementation uses a different approach to ensure UTF-8 can be used. + Therefore it is important to test against compat32 to make sure nothing + goes wrong. + """ + + _PARSING_POLICY = compat32 + + +def download(url: str, dest: Path, md5_digest: str) -> Path: + with urlopen(url) as f: + data = f.read() + + assert md5(data).hexdigest() == md5_digest + + with open(dest, "wb") as f: + f.write(data) + + assert dest.exists() + + return dest + + +def download_dists(pkg: str, version: str) -> List[Path]: + """Either use cached dist file or download it from PyPI""" + DOWNLOADS.mkdir(exist_ok=True) + + distributions = retrieve_pypi_dist_metadata(pkg, version) + filenames = {dist["filename"] for dist in distributions} + + # Remove old files to prevent cache to grow indefinitely + canonical = canonicalize_name(pkg) + names = [pkg, canonical, canonical.replace("-", "_")] + for file in chain.from_iterable(DOWNLOADS.glob(f"{n}*") for n in names): + if file.name not in filenames: + file.unlink() + + dist_files = [] + for dist in retrieve_pypi_dist_metadata(pkg, version): + dest = DOWNLOADS / dist["filename"] + if not dest.exists(): + download(dist["url"], dest, dist["md5_digest"]) + dist_files.append(dest) + + return dist_files + + +def retrieve_pypi_dist_metadata(package: str, version: str) -> Iterator[dict]: + # https://warehouse.pypa.io/api-reference/json.html + id_ = f"{package}/{version}" + with urlopen(f"https://pypi.org/pypi/{id_}/json") as f: + metadata = json.load(f) + + if metadata["info"]["yanked"]: + raise ValueError(f"Release for {package} {version} was yanked") + + version = metadata["info"]["version"] + for dist in metadata["releases"][version]: + if any(dist["filename"].endswith(ext) for ext in (".tar.gz", ".whl")): + yield dist + + +def read_metadata(wheel: Path) -> bytes: + with ZipFile(wheel, "r") as zipfile: + for member in zipfile.namelist(): + if member.endswith(".dist-info/METADATA"): + return zipfile.read(member) + raise FileNotFoundError(f"METADATA not found in {wheel}") + + +def read_pkg_info(sdist: Path) -> bytes: + with tarfile.open(sdist, mode="r:gz") as tar: + for member in tar.getmembers(): + if member.name.endswith("PKG-INFO"): + file = tar.extractfile(member) + if file is not None: + return file.read() + raise FileNotFoundError(f"PKG-INFO not found in {sdist}")