From a69c886d2524f5080804b25d153e5d5dd0258e59 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Fri, 4 Mar 2022 16:51:34 +0530 Subject: [PATCH 1/2] Migrate alpine importer to importer improver model Current alpine importer models need to be refactored to give AdvisoryData instead of Advisory, also add some validation to parse license expression and make affected_version_range optional Add tests to test scraping of webpages and parsing of data Signed-off-by: Tushar Goel --- AUTHORS.rst | 1 + requirements.txt | 3 +- vulnerabilities/importer.py | 45 +- vulnerabilities/importers/__init__.py | 3 +- vulnerabilities/importers/alpine_linux.py | 288 ++++++--- vulnerabilities/importers/nginx.py | 1 - vulnerabilities/improve_runner.py | 19 +- vulnerabilities/improver.py | 13 +- vulnerabilities/improvers/__init__.py | 6 +- vulnerabilities/references.py | 58 ++ vulnerabilities/tests/conftest.py | 1 - vulnerabilities/tests/test_alpine.py | 597 ++++++++++++++++-- .../tests/test_data/alpine/v3.11/main.json | 44 ++ .../tests/test_data/alpine/v3.11/main.yaml | 24 - .../test_data/alpine/v3.3/community.json | 1 + .../test_data/alpine/web_pages/directory.html | 25 + .../alpine/web_pages/fail_directory.html | 3 + .../test_data/alpine/web_pages/v3.11.html | 12 + 18 files changed, 948 insertions(+), 196 deletions(-) create mode 100644 vulnerabilities/references.py create mode 100644 vulnerabilities/tests/test_data/alpine/v3.11/main.json delete mode 100644 vulnerabilities/tests/test_data/alpine/v3.11/main.yaml create mode 100644 vulnerabilities/tests/test_data/alpine/v3.3/community.json create mode 100644 vulnerabilities/tests/test_data/alpine/web_pages/directory.html create mode 100644 vulnerabilities/tests/test_data/alpine/web_pages/fail_directory.html create mode 100644 vulnerabilities/tests/test_data/alpine/web_pages/v3.11.html diff --git a/AUTHORS.rst b/AUTHORS.rst index baf9bc569..ba09d47b0 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -13,3 +13,4 @@ The following organizations or individuals have contributed to this repo: - Navonil Das @NavonilDas - Tushar Upadhyay @tushar912 - Hritik Vijay @hritik14 +- Tushar Goel @TG1999 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index fb5cdb4cd..06369f5c7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ django-widget-tweaks>=1.4.8 packageurl-python>=0.9.4 binaryornot>=0.4.4 GitPython>=3.1.17 -univers>=30.0.0 +univers>=30.1.0 saneyaml>=0.5.2 beautifulsoup4>=4.9.3 python-dateutil>=2.8.1 @@ -17,5 +17,6 @@ lxml>=4.6.4 gunicorn>=20.1.0 django-environ==0.4.5 defusedxml==0.7.1 +license-expression>=21.6.14 Markdown==3.3.4 diff --git a/vulnerabilities/importer.py b/vulnerabilities/importer.py index 568a1b14d..f7421c1a2 100644 --- a/vulnerabilities/importer.py +++ b/vulnerabilities/importer.py @@ -38,6 +38,7 @@ from binaryornot.helpers import is_binary_string from git import DiffIndex from git import Repo +from license_expression import Licensing from packageurl import PackageURL from univers.version_range import VersionRange from univers.versions import Version @@ -117,22 +118,28 @@ class AffectedPackage: """ Contains a range of affected versions and a fixed version of a given package The PackageURL supplied must *not* have a version + It must contain either `affected_version_range` or `fixed_version` """ package: PackageURL - affected_version_range: VersionRange + affected_version_range: Optional[VersionRange] = None fixed_version: Optional[Version] = None def __post_init__(self): if self.package.version: - raise ValueError + raise ValueError("The PackageURL supplied must not have a version") + if not (self.affected_version_range or self.fixed_version): + raise ValueError( + "Affected Package should at least have either a fixed version or affected version range" + ) def get_fixed_purl(self): """ Return PackageURL corresponding to object's fixed_version """ - fixed_version = self.fixed_version - fixed_purl = self.package._replace(version=str(fixed_version)) + if not self.fixed_version: + raise ValueError("Affected package should have a fixed version") + fixed_purl = self.package._replace(version=str(self.fixed_version)) return fixed_purl @classmethod @@ -152,7 +159,8 @@ def merge(cls, affected_packages: Iterable): fixed_versions = set() purls = set() for pkg in affected_packages: - affected_version_ranges.add(pkg.affected_version_range) + if pkg.affected_version_range: + affected_version_ranges.add(pkg.affected_version_range) if pkg.fixed_version: fixed_versions.add(pkg.fixed_version) purls.add(pkg.package) @@ -164,9 +172,12 @@ def to_dict(self): """ Return a serializable dict that can be converted back using self.from_dict """ + affected_version_range = None + if self.affected_version_range: + affected_version_range = str(self.affected_version_range) return { "package": self.package.to_dict(), - "affected_version_range": str(self.affected_version_range), + "affected_version_range": affected_version_range, "fixed_version": str(self.fixed_version) if self.fixed_version else None, } @@ -176,9 +187,13 @@ def from_dict(cls, affected_pkg: dict): Return an AffectedPackage object from dict generated by self.to_dict """ package = PackageURL(**affected_pkg["package"]) - affected_version_range = VersionRange.from_string(affected_pkg["affected_version_range"]) + affected_version_range = None + if affected_pkg["affected_version_range"]: + affected_version_range = VersionRange.from_string( + affected_pkg["affected_version_range"] + ) fixed_version = affected_pkg["fixed_version"] - if fixed_version: + if fixed_version and affected_version_range: # TODO: revisit after https://github.com/nexB/univers/issues/10 fixed_version = affected_version_range.version_class(fixed_version) @@ -203,7 +218,7 @@ class AdvisoryData: """ aliases: List[str] = dataclasses.field(default_factory=list) - summary: str = None + summary: Optional[str] = None affected_packages: List[AffectedPackage] = dataclasses.field(default_factory=list) references: List[Reference] = dataclasses.field(default_factory=list) date_published: Optional[datetime.datetime] = None @@ -217,6 +232,10 @@ class NoLicenseError(Exception): pass +class InvalidSPDXLicense(Exception): + pass + + class Importer: """ An Importer collects data from various upstreams and returns corresponding AdvisoryData objects @@ -224,10 +243,18 @@ class Importer: """ spdx_license_expression = "" + license_url = "" def __init__(self): if not self.spdx_license_expression: raise Exception(f"Cannot run importer {self!r} without a license") + licensing = Licensing() + try: + licensing.parse(self.spdx_license_expression) + except InvalidSPDXLicense as e: + raise ValueError( + f"{self.spdx_license_expression!r} is not a valid SPDX license expression" + ) from e @classproperty def qualified_name(cls): diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index f2668c654..d3316316d 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -19,8 +19,9 @@ # for any legal advice. # VulnerableCode is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/vulnerablecode/ for support and download. +from vulnerabilities.importers import alpine_linux from vulnerabilities.importers import nginx -IMPORTERS_REGISTRY = [nginx.NginxImporter] +IMPORTERS_REGISTRY = [nginx.NginxImporter, alpine_linux.AlpineImporter] IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY} diff --git a/vulnerabilities/importers/alpine_linux.py b/vulnerabilities/importers/alpine_linux.py index f6ca757f7..350e906f0 100644 --- a/vulnerabilities/importers/alpine_linux.py +++ b/vulnerabilities/importers/alpine_linux.py @@ -20,124 +20,246 @@ # for any legal advice. # VulnerableCode is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/vulnerablecode/ for support and download. +import logging from typing import Any +from typing import Iterable from typing import List from typing import Mapping -from typing import Set +from urllib.parse import urljoin import requests -import saneyaml from bs4 import BeautifulSoup +from django.db.models.query import QuerySet +from packageurl import PackageURL +from univers.versions import AlpineLinuxVersion from vulnerabilities.helpers import is_cve -from vulnerabilities.importer import Advisory +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackage from vulnerabilities.importer import Importer -from vulnerabilities.importer import Reference +from vulnerabilities.improver import MAX_CONFIDENCE +from vulnerabilities.improver import Improver +from vulnerabilities.improver import Inference +from vulnerabilities.models import Advisory +from vulnerabilities.references import WireSharkReference +from vulnerabilities.references import XsaReference +from vulnerabilities.references import ZbxReference +LOGGER = logging.getLogger(__name__) BASE_URL = "https://secdb.alpinelinux.org/" class AlpineImporter(Importer): - @staticmethod - def fetch_advisory_links(): - index_page = BeautifulSoup(requests.get(BASE_URL).content, features="lxml") - - alpine_versions = [ - link.text for link in index_page.find_all("a") if link.text.startswith("v") - ] - - advisory_directory_links = [f"{BASE_URL}{version}" for version in alpine_versions] + spdx_license_expression = "CC-BY-SA-4.0" + license_url = "https://secdb.alpinelinux.org/license.txt" + def advisory_data(self) -> Iterable[AdvisoryData]: + advisories = [] + page_response_content = fetch_response(BASE_URL).content + advisory_directory_links = fetch_advisory_directory_links(page_response_content) advisory_links = [] for advisory_directory_link in advisory_directory_links: - advisory_directory_page = requests.get(advisory_directory_link).content - advisory_directory_page = BeautifulSoup(advisory_directory_page, features="lxml") + advisory_directory_page = fetch_response(advisory_directory_link).content advisory_links.extend( - [ - f"{advisory_directory_link}{anchore_tag.text}" - for anchore_tag in advisory_directory_page.find_all("a") - if anchore_tag.text.endswith("yaml") - ] + fetch_advisory_links(advisory_directory_page, advisory_directory_link) ) + for link in advisory_links: + record = fetch_response(link).json() + if not record["packages"]: + LOGGER.error(f'"packages" not found in {link!r}') + continue + advisories.extend(process_record(record)) + return advisories - return advisory_links - def updated_advisories(self) -> Set[Advisory]: - advisories = [] - advisory_links = self.fetch_advisory_links() - for link in advisory_links: - advisories.extend(self._process_link(link)) +def fetch_response(url): + """ + Fetch and return `response` from the `url` + """ + response = requests.get(url) + if response.status_code == 200: + return response + raise Exception(f"Failed to fetch data from {url!r} with status code: {response.status_code!r}") - return self.batch_advisories(advisories) - def _process_link(self, link) -> List[Advisory]: - advisories = [] - yaml_response = requests.get(link).content - record = saneyaml.load(yaml_response) +def fetch_advisory_directory_links(page_response_content: str) -> List[str]: + """ + Return a list of advisory directory links present in `page_response_content` html string + """ + index_page = BeautifulSoup(page_response_content, features="lxml") + alpine_versions = [ + link.text + for link in index_page.find_all("a") + if link.text.startswith("v") or link.text.startswith("edge") + ] - if record["packages"] is None: - return advisories + if not alpine_versions: + LOGGER.error(f"No versions found in {BASE_URL!r}") + return [] - for p in record["packages"]: - advisories.extend( - self._load_advisories( - p["pkg"], - ) - ) + advisory_directory_links = [urljoin(BASE_URL, version) for version in alpine_versions] - return advisories + return advisory_directory_links - @staticmethod - def _load_advisories( - pkg_infos: Mapping[str, Any], - ) -> List[Advisory]: - advisories = [] +def fetch_advisory_links( + advisory_directory_page: str, advisory_directory_link: str +) -> Iterable[str]: + """ + Yield json file urls present in `advisory_directory_page` + """ + advisory_directory_page = BeautifulSoup(advisory_directory_page, features="lxml") + anchor_tags = advisory_directory_page.find_all("a") + if not anchor_tags: + LOGGER.error(f"No anchor tags found in {advisory_directory_link!r}") + return iter([]) + for anchor_tag in anchor_tags: + if anchor_tag.text.endswith("json"): + yield urljoin(advisory_directory_link, anchor_tag.text) + + +def check_for_attributes(record) -> bool: + attributes = ["distroversion", "reponame", "archs"] + for attribute in attributes: + if attribute not in record: + LOGGER.error(f'"{attribute!r}" not found in {record!r}') + return False + return True + + +def process_record(record: dict) -> List[AdvisoryData]: + """ + Return a list of AdvisoryData objects by processing data + present in that `record` + """ + if not record["packages"]: + LOGGER.error(f'"packages" not found in this record {record!r}') + return [] + + advisories: List[AdvisoryData] = [] - for fixed_vulns in pkg_infos["secfixes"].values(): + for package in record["packages"]: + if not package["pkg"]: + LOGGER.error(f'"pkg" not found in this package {package!r}') + continue + if not check_for_attributes(record): + continue + loaded_advisories = load_advisories( + package["pkg"], + record["distroversion"], + record["reponame"], + record["archs"], + ) + advisories.extend(loaded_advisories) + return advisories - if fixed_vulns is None: + +def load_advisories( + pkg_infos: Mapping[str, Any], + distroversion: str, + reponame: str, + archs: List[str], +) -> Iterable[AdvisoryData]: + """ + Yield AdvisoryData by mapping data from `pkg_infos` + and form PURL for AffectedPackages by using + `distroversion`, `reponame`, `archs` + """ + if not pkg_infos.get("name"): + LOGGER.error(f'"name" is not available in package {pkg_infos!r}') + return [] + + for version, fixed_vulns in pkg_infos["secfixes"].items(): + if not fixed_vulns: + LOGGER.error(f"No fixed vulnerabilities in version {version!r}") + continue + + for vuln_ids in fixed_vulns: + if not isinstance(vuln_ids, str): + LOGGER.error(f"{vuln_ids!r} is not of `str` instance") continue + vuln_ids = vuln_ids.split() + aliases = [] + vuln_id = vuln_ids[0] + # check for valid vuln ID, if there is valid vuln ID then iterate over + # the remaining elements of the list else iterate over the whole list + # and also check if the initial element is a reference or not + if is_cve(vuln_id): + aliases = [vuln_id] + vuln_ids = vuln_ids[1:] + references = [] + for reference_id in vuln_ids: - for vuln_ids in fixed_vulns: - vuln_ids = vuln_ids.split() - references = [] - for reference_id in vuln_ids[1:]: - - if reference_id.startswith("XSA"): - xsa_id = reference_id.split("-")[-1] - references.append( - Reference( - reference_id=reference_id, - url="https://xenbits.xen.org/xsa/advisory-{}.html".format(xsa_id), - ) - ) + if reference_id.startswith("XSA"): + references.append(XsaReference.from_id(xsa_id=reference_id)) - elif reference_id.startswith("ZBX"): - references.append( - Reference( - reference_id=reference_id, - url="https://support.zabbix.com/browse/{}".format(reference_id), - ) - ) + elif reference_id.startswith("ZBX"): + references.append(ZbxReference.from_id(zbx_id=reference_id)) - elif reference_id.startswith("wnpa-sec"): - references.append( - Reference( - reference_id=reference_id, - url="https://www.wireshark.org/security/{}.html".format( - reference_id - ), - ) - ) + elif reference_id.startswith("wnpa-sec"): + references.append(WireSharkReference.from_id(wnpa_sec_id=reference_id)) + + qualifiers = { + "distroversion": distroversion, + "reponame": reponame, + } - # TODO: Handle the CVE-????-????? case - advisories.append( - Advisory( - summary="", - references=references, - vulnerability_id=vuln_ids[0] if is_cve(vuln_ids[0]) else "", + affected_packages = [] + + try: + fixed_version = AlpineLinuxVersion(version) + except Exception as e: + LOGGER.error(f"{version!r} is not a valid AlpineVersion {e!r}") + continue + if not isinstance(archs, List): + LOGGER.error(f"{archs!r} is not of `List` instance") + continue + if archs: + for arch in archs: + qualifiers["arch"] = arch + affected_packages.append( + AffectedPackage( + package=PackageURL( + type="alpine", + name=pkg_infos["name"], + qualifiers=qualifiers, + ), + fixed_version=fixed_version, + ) + ) + else: + # there is no arch, this is not an arch-specific package + affected_packages.append( + AffectedPackage( + package=PackageURL( + type="alpine", + name=pkg_infos["name"], + qualifiers=qualifiers, + ), + fixed_version=fixed_version, ) ) - return advisories + yield AdvisoryData( + references=references, + affected_packages=affected_packages, + aliases=aliases, + ) + + +class AlpineBasicImprover(Improver): + @property + def interesting_advisories(self) -> QuerySet: + return Advisory.objects.filter(created_by=AlpineImporter.qualified_name) + + def get_inferences(self, advisory_data: AdvisoryData) -> Iterable[Inference]: + """ + Generate and return Inferences for the given advisory data + """ + for affected_package in advisory_data.affected_packages: + fixed_purl = affected_package.get_fixed_purl() + yield Inference.from_advisory_data( + advisory_data, + confidence=MAX_CONFIDENCE, + fixed_purl=fixed_purl, + ) diff --git a/vulnerabilities/importers/nginx.py b/vulnerabilities/importers/nginx.py index 1c3cded7c..aef551c57 100644 --- a/vulnerabilities/importers/nginx.py +++ b/vulnerabilities/importers/nginx.py @@ -19,7 +19,6 @@ # for any legal advice. # VulnerableCode is a free software tool from nexB Inc. and others. # Visit https://github.com/nexB/vulnerablecode/ for support and download. - import asyncio import dataclasses import datetime diff --git a/vulnerabilities/improve_runner.py b/vulnerabilities/improve_runner.py index 549f74d92..91a33e8c6 100644 --- a/vulnerabilities/improve_runner.py +++ b/vulnerabilities/improve_runner.py @@ -76,15 +76,16 @@ def process_inferences(inferences: List[Inference], advisory: Advisory, improver if updated: logger.info("Severity updated for reference {ref!r} to {severity.value!r}") - for pkg in inference.affected_purls: - vulnerable_package, _ = _get_or_create_package(pkg) - models.PackageRelatedVulnerability( - vulnerability=vuln, - package=vulnerable_package, - created_by=improver_name, - confidence=inference.confidence, - fix=False, - ).update_or_create() + if inference.affected_purls: + for pkg in inference.affected_purls: + vulnerable_package, _ = _get_or_create_package(pkg) + models.PackageRelatedVulnerability( + vulnerability=vuln, + package=vulnerable_package, + created_by=improver_name, + confidence=inference.confidence, + fix=False, + ).update_or_create() if inference.fixed_purl: fixed_package, _ = _get_or_create_package(inference.fixed_purl) diff --git a/vulnerabilities/improver.py b/vulnerabilities/improver.py index defd4af51..27e6a26cd 100644 --- a/vulnerabilities/improver.py +++ b/vulnerabilities/improver.py @@ -27,10 +27,10 @@ class Inference: """ vulnerability_id: str = None - aliases: List[str] = dataclasses.field(default_factory=list) + aliases: Optional[List[str]] = dataclasses.field(default_factory=list) confidence: int = MAX_CONFIDENCE summary: Optional[str] = None - affected_purls: List[PackageURL] = dataclasses.field(default_factory=list) + affected_purls: Optional[List[PackageURL]] = dataclasses.field(default_factory=list) fixed_purl: PackageURL = None references: List[Reference] = dataclasses.field(default_factory=list) @@ -48,7 +48,12 @@ def __post_init__(self): ) versionless_purls = [] - for purl in self.affected_purls + [self.fixed_purl]: + purls = [] + if self.fixed_purl: + purls.append(self.fixed_purl) + if self.affected_purls: + purls.extend(self.affected_purls) + for purl in purls: if purl and not purl.version: versionless_purls.append(purl) @@ -57,7 +62,7 @@ def __post_init__(self): ), f"Version-less purls are not supported in an Inference: {versionless_purls}" @classmethod - def from_advisory_data(cls, advisory_data, confidence, affected_purls, fixed_purl): + def from_advisory_data(cls, advisory_data, confidence, fixed_purl, affected_purls=None): """ Return an Inference object while keeping the same values as of advisory_data for vulnerability_id, summary and references diff --git a/vulnerabilities/improvers/__init__.py b/vulnerabilities/improvers/__init__.py index 32b023497..fdb48b6c1 100644 --- a/vulnerabilities/improvers/__init__.py +++ b/vulnerabilities/improvers/__init__.py @@ -1,6 +1,10 @@ from vulnerabilities import importers from vulnerabilities.improvers import default -IMPROVERS_REGISTRY = [default.DefaultImprover, importers.nginx.NginxBasicImprover] +IMPROVERS_REGISTRY = [ + default.DefaultImprover, + importers.nginx.NginxBasicImprover, + importers.alpine_linux.AlpineBasicImprover, +] IMPROVERS_REGISTRY = {x.qualified_name: x for x in IMPROVERS_REGISTRY} diff --git a/vulnerabilities/references.py b/vulnerabilities/references.py new file mode 100644 index 000000000..e51023ef3 --- /dev/null +++ b/vulnerabilities/references.py @@ -0,0 +1,58 @@ +from vulnerabilities.importer import Reference + + +class XsaReference(Reference): + """ + A Xen advisory reference. See https://xenbits.xen.org/xsa + """ + + @classmethod + def from_id(cls, xsa_id): + """ + Return a new XsaReference from an XSA-XXXX id. + """ + if not xsa_id or not xsa_id.lower().startswith("xsa"): + return ValueError(f"Not a Xen reference. Does not start with XSA: {xsa_id!r}") + _, numid = xsa_id.rsplit("-") + return cls( + reference_id=xsa_id, + url=f"https://xenbits.xen.org/xsa/advisory-{numid}.html", + ) + + +class ZbxReference(Reference): + """ + A Zabbix advisory reference. See https://support.zabbix.com + """ + + @classmethod + def from_id(cls, zbx_id): + """ + Return a new ZbxReference from an ZBX-XXXX id. + """ + if not zbx_id or not zbx_id.lower().startswith("zbx"): + return ValueError(f"Not a Zabbix reference. Does not start with ZBX: {zbx_id!r}") + return cls( + reference_id=zbx_id, + url=f"https://support.zabbix.com/browse/{zbx_id}", + ) + + +class WireSharkReference(Reference): + """ + A Wireshark advisory reference. See https://www.wireshark.org/security + """ + + @classmethod + def from_id(cls, wnpa_sec_id): + """ + Return a new WireSharkReference from an wnpa-sec-XXXX id. + """ + if not wnpa_sec_id or not wnpa_sec_id.lower().startswith("wnpa-sec"): + return ValueError( + f"Not a WireShark reference. Does not start with wnpa-sec: {wnpa_sec_id!r}" + ) + return cls( + reference_id=wnpa_sec_id, + url=f"https://www.wireshark.org/security/{wnpa_sec_id}.html", + ) diff --git a/vulnerabilities/tests/conftest.py b/vulnerabilities/tests/conftest.py index d6072ff86..da9309274 100644 --- a/vulnerabilities/tests/conftest.py +++ b/vulnerabilities/tests/conftest.py @@ -43,7 +43,6 @@ def no_rmtree(monkeypatch): collect_ignore = [ "test_models.py", "test_msr2019.py", - "test_alpine.py", "test_nginx.py", "test_apache_httpd.py", "test_npm.py", diff --git a/vulnerabilities/tests/test_alpine.py b/vulnerabilities/tests/test_alpine.py index 4b2345795..3f1f5f950 100644 --- a/vulnerabilities/tests/test_alpine.py +++ b/vulnerabilities/tests/test_alpine.py @@ -20,69 +20,542 @@ # for any legal advice. # VulnerableCode is a free software tool from nexB Inc. and others. # Visit https://github.com/nexB/vulnerablecode/ for support and download. +import json import os -from unittest import TestCase -from unittest.mock import MagicMock -from unittest.mock import patch -from vulnerabilities.importer import Advisory -from vulnerabilities.importer import Reference -from vulnerabilities.importers.alpine_linux import AlpineImporter +import pytest +from packageurl import PackageURL +from univers.versions import AlpineLinuxVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackage +from vulnerabilities.importers.alpine_linux import fetch_advisory_directory_links +from vulnerabilities.importers.alpine_linux import fetch_advisory_links +from vulnerabilities.importers.alpine_linux import load_advisories +from vulnerabilities.importers.alpine_linux import process_record +from vulnerabilities.references import XsaReference BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA = os.path.join(BASE_DIR, "test_data", "alpine", "v3.11") - - -class AlpineImportTest(TestCase): - @classmethod - def setUpClass(cls): - cls.data_source = AlpineImporter(batch_size=1) - - def test__process_link(self): - expected_advisories = [ - Advisory( - summary="", - references=[], - vulnerability_id="CVE-2019-14904", - ), - Advisory( - summary="", - references=[], - vulnerability_id="CVE-2019-14905", - ), - Advisory( - summary="", - references=[], - vulnerability_id="CVE-2019-14846", - ), - Advisory( - summary="", - references=[], - vulnerability_id="CVE-2019-14856", - ), - Advisory( - summary="", - references=[], - vulnerability_id="CVE-2019-14858", - ), - Advisory( - summary="", - references=[ - Reference( - url="https://xenbits.xen.org/xsa/advisory-295.html", reference_id="XSA-295" - ) - ], - vulnerability_id="", - ), - ] - mock_requests = MagicMock() - mock_content = MagicMock() - with open(os.path.join(TEST_DATA, "main.yaml")) as f: - mock_requests.get = lambda x: mock_content - mock_content.content = f - with patch("vulnerabilities.importers.alpine_linux.requests", new=mock_requests): - found_advisories = self.data_source._process_link("does not matter") - - found_advisories = list(map(Advisory.normalized, found_advisories)) - expected_advisories = list(map(Advisory.normalized, expected_advisories)) - assert sorted(found_advisories) == sorted(expected_advisories) +TEST_DATA = os.path.join(BASE_DIR, "test_data", "alpine") + + +def test_process_record(caplog): + expected_advisories = [ + AdvisoryData( + aliases=[], + summary=None, + affected_packages=[ + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={ + "arch": "aarch64", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r1"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r1"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r1"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={ + "arch": "ppc64le", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r1"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r1"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r1"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r1"), + ), + ], + references=[ + XsaReference( + reference_id="XSA-248", + url="https://xenbits.xen.org/xsa/advisory-248.html", + severities=[], + ) + ], + date_published=None, + ), + AdvisoryData( + aliases=["CVE-2018-7540"], + summary=None, + affected_packages=[ + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={ + "arch": "aarch64", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r2"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r2"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r2"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={ + "arch": "ppc64le", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r2"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r2"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r2"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="xen", + version=None, + qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="4.10.0-r2"), + ), + ], + references=[ + XsaReference( + reference_id="XSA-252", + url="https://xenbits.xen.org/xsa/advisory-252.html", + severities=[], + ) + ], + date_published=None, + ), + AdvisoryData( + aliases=["CVE-2017-9669"], + summary=None, + affected_packages=[ + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={ + "arch": "aarch64", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={ + "arch": "ppc64le", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + ], + references=[], + date_published=None, + ), + AdvisoryData( + aliases=["CVE-2017-9671"], + summary=None, + affected_packages=[ + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={ + "arch": "aarch64", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={ + "arch": "ppc64le", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + AffectedPackage( + package=PackageURL( + type="alpine", + namespace=None, + name="apk-tools", + version=None, + qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version=AlpineLinuxVersion(string="2.7.2-r0"), + ), + ], + references=[], + date_published=None, + ), + ] + with open(os.path.join(TEST_DATA, os.path.join(TEST_DATA, "v3.11", "main.json"))) as f: + found_advisories = process_record(json.loads(f.read())) + assert found_advisories == expected_advisories + assert ( + "'4.10-1-r1' is not a valid AlpineVersion InvalidVersion(\"'4.10-1-r1' is not a valid \")" + in caplog.text + ) + + +def test_fetch_advisory_directory_links(): + expected = [ + "https://secdb.alpinelinux.org/edge/", + "https://secdb.alpinelinux.org/v3.10/", + "https://secdb.alpinelinux.org/v3.11/", + "https://secdb.alpinelinux.org/v3.12/", + "https://secdb.alpinelinux.org/v3.13/", + "https://secdb.alpinelinux.org/v3.14/", + "https://secdb.alpinelinux.org/v3.15/", + "https://secdb.alpinelinux.org/v3.2/", + "https://secdb.alpinelinux.org/v3.3/", + "https://secdb.alpinelinux.org/v3.4/", + "https://secdb.alpinelinux.org/v3.5/", + "https://secdb.alpinelinux.org/v3.6/", + "https://secdb.alpinelinux.org/v3.7/", + "https://secdb.alpinelinux.org/v3.8/", + "https://secdb.alpinelinux.org/v3.9/", + ] + with open(os.path.join(TEST_DATA, "web_pages", "directory.html")) as f: + assert fetch_advisory_directory_links(f.read()) == expected + + +def test_fetch_advisory_directory_links_failure(caplog): + with open(os.path.join(TEST_DATA, "web_pages", "fail_directory.html")) as f: + assert fetch_advisory_directory_links(f.read()) == [] + assert "No versions found in 'https://secdb.alpinelinux.org/'" in caplog.text + + +def test_fetch_advisory_links(): + expected = [ + "https://secdb.alpinelinux.org/v3.11/community.json", + "https://secdb.alpinelinux.org/v3.11/main.json", + ] + with open(os.path.join(TEST_DATA, "web_pages", "v3.11.html")) as f: + assert ( + list(fetch_advisory_links(f.read(), "https://secdb.alpinelinux.org/v3.11/")) == expected + ) + + +def test_fetch_advisory_links_failure(caplog): + with open(os.path.join(TEST_DATA, "web_pages", "fail_directory.html")) as f: + assert list(fetch_advisory_links(f.read(), "v3.11")) == [] + assert "No anchor tags found in 'v3.11'" in caplog.text + + +def test_process_record_without_packages(caplog): + with open(os.path.join(TEST_DATA, os.path.join(TEST_DATA, "v3.3", "community.json"))) as f: + assert process_record(json.loads(f.read())) == [] + assert ( + "\"packages\" not found in this record {'apkurl': '{{urlprefix}}/{{distroversion}}/{{reponame}}/{{arch}}/{{pkg.name}}-{{pkg.ver}}.apk', 'archs': ['armhf', 'x86', 'x86_64'], 'reponame': 'community', 'urlprefix': 'https://dl-cdn.alpinelinux.org/alpine', 'distroversion': 'v3.3', 'packages': []}" + in caplog.text + ) + + +def test_load_advisories_package_without_name(caplog): + package = { + "secfixes": {"4.10.0-r1": ["XSA-248"], "4.10.0-r2": ["CVE-2018-7540 XSA-252"]}, + } + list(load_advisories(package, "v3.11", "main", archs=[])) + assert ( + "\"name\" is not available in package {'secfixes': {'4.10.0-r1': ['XSA-248'], '4.10.0-r2': ['CVE-2018-7540 XSA-252']}}" + in caplog.text + ) + + +def test_load_advisories_package_without_secfixes(caplog): + package = { + "name": "xen", + "secfixes": {"4.10.0-r1": []}, + } + list(load_advisories(package, "v3.11", "main", archs=[])) + assert "No fixed vulnerabilities in version '4.10.0-r1'" in caplog.text + + +@pytest.mark.parametrize( + "test_case", + [ + # these are the tests are not supported yet + # when we start supporting these version, + # they will be moved back to main test suite + "1.9.5p2-r0", + "6.6.2p1-r0", + "6.6.4p1-r1", + "4.10-1-r1", + ], +) +def test_load_advisories_package_with_invalid_alpine_version(test_case, caplog): + package = { + "name": "xen", + "secfixes": {f"{test_case}": ["XSA-248"]}, + } + list(load_advisories(package, "v3.11", "main", archs=[])) + assert ( + f"{test_case!r} is not a valid AlpineVersion InvalidVersion(\"{test_case!r} is not a valid \")" + in caplog.text + ) diff --git a/vulnerabilities/tests/test_data/alpine/v3.11/main.json b/vulnerabilities/tests/test_data/alpine/v3.11/main.json new file mode 100644 index 000000000..dbc15df96 --- /dev/null +++ b/vulnerabilities/tests/test_data/alpine/v3.11/main.json @@ -0,0 +1,44 @@ +{ + "apkurl": "{{urlprefix}}/{{distroversion}}/{{reponame}}/{{arch}}/{{pkg.name}}-{{pkg.ver}}.apk", + "archs": [ + "aarch64", + "armhf", + "armv7", + "ppc64le", + "s390x", + "x86", + "x86_64" + ], + "reponame": "main", + "urlprefix": "https://dl-cdn.alpinelinux.org/alpine", + "distroversion": "v3.11", + "packages": [ + { + "pkg": { + "name": "xen", + "secfixes": { + "4.10.0-r1": [ + "XSA-248" + ], + "4.10-1-r1": [ + "XSA-252" + ], + "4.10.0-r2": [ + "CVE-2018-7540 XSA-252" + ] + } + } + }, + { + "pkg": { + "name": "apk-tools", + "secfixes": { + "2.7.2-r0": [ + "CVE-2017-9669", + "CVE-2017-9671" + ] + } + } + } + ] + } \ No newline at end of file diff --git a/vulnerabilities/tests/test_data/alpine/v3.11/main.yaml b/vulnerabilities/tests/test_data/alpine/v3.11/main.yaml deleted file mode 100644 index 2bacbd0ba..000000000 --- a/vulnerabilities/tests/test_data/alpine/v3.11/main.yaml +++ /dev/null @@ -1,24 +0,0 @@ ---- -distroversion: v3.11 -reponame: main -archs: - - x86_64 -urlprefix: http://dl-cdn.alpinelinux.org/alpine -apkurl: "{{urlprefix}}/{{distroversion}}/{{reponame}}/{{arch}}/{{pkg.name}}-{{pkg.ver}}.apk" -packages: - - pkg: - name: ansible - secfixes: - 2.9.3-r0: - - CVE-2019-14904 - - CVE-2019-14905 - 2.8.6-r0: - - CVE-2019-14846 - - CVE-2019-14856 - - CVE-2019-14858 - - pkg: - name: xen - secfixes: - 4.12.1-r0: - - CVE-????-????? XSA-295 - diff --git a/vulnerabilities/tests/test_data/alpine/v3.3/community.json b/vulnerabilities/tests/test_data/alpine/v3.3/community.json new file mode 100644 index 000000000..476ec056b --- /dev/null +++ b/vulnerabilities/tests/test_data/alpine/v3.3/community.json @@ -0,0 +1 @@ +{"apkurl":"{{urlprefix}}/{{distroversion}}/{{reponame}}/{{arch}}/{{pkg.name}}-{{pkg.ver}}.apk","archs":["armhf","x86","x86_64"],"reponame":"community","urlprefix":"https://dl-cdn.alpinelinux.org/alpine","distroversion":"v3.3","packages":[]} \ No newline at end of file diff --git a/vulnerabilities/tests/test_data/alpine/web_pages/directory.html b/vulnerabilities/tests/test_data/alpine/web_pages/directory.html new file mode 100644 index 000000000..5a824654a --- /dev/null +++ b/vulnerabilities/tests/test_data/alpine/web_pages/directory.html @@ -0,0 +1,25 @@ + + + Index of / + +

Index of /


../
+    edge/                                              27-Feb-2022 12:30       -
+    v3.10/                                             29-Jul-2021 09:11       -
+    v3.11/                                             23-Nov-2021 23:18       -
+    v3.12/                                             26-Feb-2022 19:25       -
+    v3.13/                                             24-Feb-2022 20:33       -
+    v3.14/                                             24-Feb-2022 20:27       -
+    v3.15/                                             24-Feb-2022 20:18       -
+    v3.2/                                              07-Jun-2021 20:24       -
+    v3.3/                                              07-Jun-2021 20:25       -
+    v3.4/                                              07-Jun-2021 20:25       -
+    v3.5/                                              07-Jun-2021 20:26       -
+    v3.6/                                              21-May-2021 11:55       -
+    v3.7/                                              28-Apr-2021 20:53       -
+    v3.8/                                              28-Apr-2021 20:53       -
+    v3.9/                                              07-Jun-2021 20:05       -
+    last-update                                        28-Feb-2022 15:17      11
+    license.txt                                        25-Jun-2021 19:25     20K
+    

+ + \ No newline at end of file diff --git a/vulnerabilities/tests/test_data/alpine/web_pages/fail_directory.html b/vulnerabilities/tests/test_data/alpine/web_pages/fail_directory.html new file mode 100644 index 000000000..5c1870aa5 --- /dev/null +++ b/vulnerabilities/tests/test_data/alpine/web_pages/fail_directory.html @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/vulnerabilities/tests/test_data/alpine/web_pages/v3.11.html b/vulnerabilities/tests/test_data/alpine/web_pages/v3.11.html new file mode 100644 index 000000000..36ef8bb5e --- /dev/null +++ b/vulnerabilities/tests/test_data/alpine/web_pages/v3.11.html @@ -0,0 +1,12 @@ + + + Index of /v3.11/ + +

Index of /v3.11/


../
+    community.json                                     21-Jun-2021 20:49     28K
+    community.yaml                                     21-Jun-2021 20:49     36K
+    main.json                                          23-Nov-2021 23:18     48K
+    main.yaml                                          23-Nov-2021 23:18     60K
+    

+ + \ No newline at end of file From 104c76071e968454dea220c4cc58c60ca0ca0d89 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Fri, 4 Mar 2022 17:50:53 +0530 Subject: [PATCH 2/2] fix nix Signed-off-by: Tushar Goel --- vulnerabilities/importer.py | 5 ++++- vulnerabilities/improvers/default.py | 14 +++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/vulnerabilities/importer.py b/vulnerabilities/importer.py index f7421c1a2..c7408dc48 100644 --- a/vulnerabilities/importer.py +++ b/vulnerabilities/importer.py @@ -188,7 +188,10 @@ def from_dict(cls, affected_pkg: dict): """ package = PackageURL(**affected_pkg["package"]) affected_version_range = None - if affected_pkg["affected_version_range"]: + if ( + affected_pkg["affected_version_range"] + and affected_pkg["affected_version_range"] != "None" + ): affected_version_range = VersionRange.from_string( affected_pkg["affected_version_range"] ) diff --git a/vulnerabilities/improvers/default.py b/vulnerabilities/improvers/default.py index d668c6e40..20c5fb411 100644 --- a/vulnerabilities/improvers/default.py +++ b/vulnerabilities/improvers/default.py @@ -64,14 +64,14 @@ def get_exact_purls(affected_package: AffectedPackage) -> (List[PackageURL], Pac # We need ``if c`` below because univers returns None as version # in case of vers:nginx/* # TODO: Revisit after https://github.com/nexB/univers/issues/33 - range_versions = [c.version for c in vr.constraints if c] - resolved_versions = [v for v in range_versions if v and v in vr] - affected_purls = [] - for version in resolved_versions: - affected_purl = affected_package.package._replace(version=str(version)) - affected_purls.append(affected_purl) + if vr: + range_versions = [c.version for c in vr.constraints if c] + resolved_versions = [v for v in range_versions if v and v in vr] + for version in resolved_versions: + affected_purl = affected_package.package._replace(version=str(version)) + affected_purls.append(affected_purl) - fixed_purl = affected_package.get_fixed_purl() + fixed_purl = affected_package.get_fixed_purl() if affected_package.fixed_version else None return affected_purls, fixed_purl