diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 414cae374..56beed61a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,14 +3,22 @@ Release notes -Version v30.2.2 +Version v30.3.0 ---------------- - We enabled API throttling for a basic user and for a staff user they can have unlimited access on API. - We added throttle rate for each API endpoint and it can be - configured from the settings #991 https://github.com/nexB/vulnerablecode/issues/991. + configured from the settings #991 https://github.com/nexB/vulnerablecode/issues/991 + +- We improved how we import NVD data +- We refactored and made the purl2cpe script work to dump purl to CPE mappings + +Internally: + +- We aligned key names internally with the names used in the UI and API (such as affected and fixed) +- We now use querysets as model managers and have streamlined view code Version v30.2.1 diff --git a/docs/source/command-line-interface.rst b/docs/source/command-line-interface.rst index f4a02858d..d5afe3ac9 100644 --- a/docs/source/command-line-interface.rst +++ b/docs/source/command-line-interface.rst @@ -3,7 +3,7 @@ Command Line Interface ====================== -The main entry point is Django's :guilabel:`manage.py` management commands. +The main entry point is the Django :guilabel:`manage.py` management command script. ``$ ./manage.py --help`` ------------------------ @@ -14,9 +14,10 @@ VulnerableCode's own commands are listed under the ``[vulnerabilities]`` section $ ./manage.py --help ... [vulnerabilities] - create_cpe_to_purl_map - importer - improver + import + improve + purl2cpe + ``$ ./manage.py --help`` --------------------------------------- @@ -58,3 +59,17 @@ Other variations: * ``--list`` List all available improvers * ``--all`` Run all available improvers + + + +``$ ./manage.py purl2cpe --destination 2 and cpe_comps[2] == "h": - return True + vs = VulnerabilitySeverity( + system=severity_systems.CVSSV2_VECTOR, + value=str(cvss_v2.get("vectorString") or ""), + ) + severities.append(vs) - return False + return severities + @property + def reference_urls(self): + """ + Return a list unique of reference URLs. + """ + # FIXME: we should also collect additional data from the references such as tags and ids -def extract_cpes(cve_item): - """ - Return a list of CPEs for a given CVE item. - """ - cpes = set() - for node in get_item(cve_item, "configurations", "nodes") or []: - for cpe_data in node.get("cpe_match") or []: - cpe23_uri = cpe_data.get("cpe23Uri") - if cpe23_uri: - cpes.add(cpe23_uri) - return cpes + urls = [] + for reference in get_item(self.cve_item, "cve", "references", "reference_data") or []: + ref_url = reference.get("url") + if ref_url and ref_url.startswith(("http", "ftp")) and ref_url not in urls: + urls.append(ref_url) + return urls + @property + def references(self): + """ + Return a list of AdvisoryReference. + """ + # FIXME: we should also collect additional data from the references such as tags and ids + references = [] -def extract_severity_scores(cve_item): - """ - Yield a vulnerability severity for each `cve_item`. - """ - if not isinstance(cve_item, dict): - return None - impact = cve_item.get("impact") or {} - base_metric_v3 = impact.get("baseMetricV3") or {} - if base_metric_v3: - cvss_v3 = get_item(base_metric_v3, "cvssV3") - yield VulnerabilitySeverity( - system=severity_systems.CVSSV3, - value=str(cvss_v3.get("baseScore") or ""), - ) - yield VulnerabilitySeverity( - system=severity_systems.CVSSV3_VECTOR, - value=str(cvss_v3.get("vectorString") or ""), - ) + # we track each CPE as a reference for now + for cpe in self.cpes: + cpe_url = f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query={cpe}" + references.append(Reference(reference_id=cpe, url=cpe_url)) - base_metric_v2 = impact.get("baseMetricV2") or {} - if base_metric_v2: - cvss_v2 = base_metric_v2.get("cvssV2") or {} - yield VulnerabilitySeverity( - system=severity_systems.CVSSV2, - value=str(cvss_v2.get("baseScore") or ""), + # FIXME: we also add the CVE proper as a reference, but is this correct? + references.append( + Reference( + url=f"https://nvd.nist.gov/vuln/detail/{self.cve_id}", + reference_id=self.cve_id, + severities=self.severities, + ) ) - yield VulnerabilitySeverity( - system=severity_systems.CVSSV2_VECTOR, - value=str(cvss_v2.get("vectorString") or ""), + + # clean to remove dupes for the CVE id proper + ref_urls = [ + ru + for ru in self.reference_urls + if ru != f"https://nvd.nist.gov/vuln/detail/{self.cve_id}" + ] + references.extend([Reference(url=url) for url in ref_urls]) + + return references + + @property + def is_related_to_hardware(self): + """ + Return True if this CVE item is for hardware (as opposed to software). + """ + return any(is_related_to_hardware(cpe) for cpe in self.cpes) + + def to_advisory(self): + """ + Return an AdvisoryData object from this CVE item + """ + return AdvisoryData( + aliases=[self.cve_id], + summary=self.summary, + references=self.references, + date_published=dateparser.parse(self.cve_item.get("publishedDate")), ) + + +def is_related_to_hardware(cpe): + """ + Return True if the ``cpe`` is related to hardware. + """ + cpe_comps = cpe.split(":") + # CPE follow the format cpe:cpe_version:product_type:vendor:product + return len(cpe_comps) > 2 and cpe_comps[2] == "h" diff --git a/vulnerabilities/management/commands/create_cpe_to_purl_map.py b/vulnerabilities/management/commands/create_cpe_to_purl_map.py deleted file mode 100644 index 15c95800b..000000000 --- a/vulnerabilities/management/commands/create_cpe_to_purl_map.py +++ /dev/null @@ -1,97 +0,0 @@ -# -# Copyright (c) nexB Inc. and others. All rights reserved. -# VulnerableCode is a trademark of nexB Inc. -# SPDX-License-Identifier: Apache-2.0 -# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. -# See https://github.com/nexB/vulnerablecode for support or download. -# See https://aboutcode.org for more information about nexB OSS projects. -# - -import json -import os -from datetime import date -from itertools import chain - -from django.core.management.base import BaseCommand - -from vulnerabilities import models -from vulnerabilities.importers.nvd import BASE_URL as nvd_base_url -from vulnerabilities.importers.nvd import NVDImporter as nvd_utils - - -class Command(BaseCommand): - """ - This script creates a mapping of CPEs to PURLs grouped by the affecting CVE. - It does this by doing the following: - 1. Iterate over all CVEs found in VulnerableCode's db. - 2. Look for the CVE being iterated upon in the NVD. - 3. Get the list of all CPEs which are affected by this CVE from NVD entry. - 4. Get the list of all PURLs which are affected by this CVE from VulnerableCode's db. - 5. Map the list of CPEs and PURLs from #3 and #4 together. - """ - - def add_arguments(self, parser): - - parser.add_argument( - "--vulnerable_purls_only", action="store_true", help="Map only vulnerable PURLs to CPEs" - ) - - parser.add_argument( - "--patched_purls_only", action="store_true", help="Map only patching PURLs to CPEs" - ) - - @staticmethod - def get_packages(vulnerability, vulnerable_purls_only, patched_purls_only): - if vulnerable_purls_only and not patched_purls_only: - return vulnerability.vulnerable_packages.all() - - elif patched_purls_only and not vulnerable_purls_only: - return vulnerability.patched_packages.all() - - return chain(vulnerability.patched_packages.all(), vulnerability.vulnerable_packages.all()) - - def handle(self, *args, **options): - current_year = date.today().year - # NVD json feeds start from 2002. - for year in range(2002, current_year + 1): - self.stdout.write(f"Processing CPEs from year {year}") - download_url = nvd_base_url.format(year) - nvd_data = nvd_utils.fetch(download_url) - - vulnerabilities = list( - models.Vulnerability.objects.filter(vulnerability_id__startswith=f"CVE-{year}") - .prefetch_related("vulnerable_packages") - .prefetch_related("patched_packages") - ) - - vulnerabilities = { - vulnerability.vulnerability_id: vulnerability for vulnerability in vulnerabilities - } - purl_cpe_mapping = [] - - for cve_item in nvd_data["CVE_Items"]: - cve_id = cve_item["cve"]["CVE_data_meta"]["ID"] - if cve_id not in vulnerabilities: - continue - - purl_cpe_mapping.append({}) - purl_cpe_mapping[-1]["cve_id"] = cve_id - purl_cpe_mapping[-1]["purls"] = [] - purl_cpe_mapping[-1]["cpes"] = list(nvd_utils.extract_cpes(cve_item)) - - packages = self.get_packages( - vulnerabilities[cve_id], - options["vulnerable_purls_only"], - options["patched_purls_only"], - ) - for package in packages: - purl_cpe_mapping[-1]["purls"].append(package.package_url) - - if not os.path.exists("cpe2purl"): - os.mkdir("cpe2purl") - - with open(os.path.join("cpe2purl", f"{year}.json"), "w") as f: - json.dump(purl_cpe_mapping, f, indent=4) - - path = os.path.abspath("cpe2purl") - self.stdout.write(self.style.SUCCESS(f"Successfully created the mappings. Check {path}")) diff --git a/vulnerabilities/management/commands/purl2cpe.py b/vulnerabilities/management/commands/purl2cpe.py new file mode 100644 index 000000000..277348615 --- /dev/null +++ b/vulnerabilities/management/commands/purl2cpe.py @@ -0,0 +1,108 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/nexB/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +import os +from collections import defaultdict + +import attr +from django.core.management.base import BaseCommand + +from vulnerabilities.models import Vulnerability + + +@attr.attributes +class Purl2Cpe: + vulnerablecode_id = attr.attrib(type=str) + cves = attr.attrib(default=attr.Factory(list), type=list) + purls = attr.attrib(default=attr.Factory(list), type=list, repr=False) + cpes = attr.attrib(default=attr.Factory(list), type=list, repr=False) + + def to_dict(self): + return attr.asdict(self) + + @classmethod + def collect(cls, limit=0, verbose=False): + """ + Yield Purl2Cpes collected from the current database. + Apply a limit of provided + """ + vulns = Vulnerability.objects.with_packages().with_cpes().distinct().all() + if limit: + vulns = vulns[:limit] + + for vuln in vulns: + if verbose: + print(f"Processing: {vuln.vulnerability_id}") + yield cls( + vulnerablecode_id=vuln.vulnerability_id, + cves=vuln.get_related_cves(), + purls=vuln.get_related_purls(), + cpes=vuln.get_related_cpes(), + ) + + @classmethod + def collect_by_years(cls, limit=0, verbose=False): + """ + Return a mapping of {CVE year: [list of Purl2Cpes]}. + Apply a limit of provided + """ + by_years = defaultdict(list) + for p2c in cls.collect(limit=limit, verbose=verbose): + for cve in p2c.cves: + try: + cve_year = cve.split("-")[1] + by_years[cve_year].append(p2c) + except Exception as e: + raise Exception(cve) from e + return by_years + + +class Command(BaseCommand): + """ + Dump JSON mappings of CPEs and Package URLs by vulnerability. + The process consists in these steps: + + - Iterate over all vulnerability with CPEs found in the VulnerableCode DB. + - Collect their CVEs, CPEs and purls joined together through the CVEs. + - Dump a list of Purl2Cpe grouped by year. + """ + + help = "Dump a mapping of CPEs to PURLs grouped by vulnerability." + + def add_arguments(self, parser): + + parser.add_argument( + "--limit", + default=0, + help="Limit the number of processed vulnerability", + ) + + parser.add_argument("destination", help="Destination directory") + + def handle(self, *args, **options): + limit = options["limit"] + if isinstance(limit, str): + limit = int(limit) + + destination = options["destination"] + assert destination, "Missing required estination directory" + destination = os.path.abspath(destination) + os.makedirs(destination, exist_ok=True) + + by_years = Purl2Cpe.collect_by_years(limit=limit) + + for year, purl2cpes in by_years.items(): + purl2cpes = [y.to_dict() for y in purl2cpes] + with open(os.path.join(destination, f"{year}.json"), "w") as out: + json.dump(purl2cpes, out, indent=2) + + print( + self.style.SUCCESS(f"Successfully dumped CPE to purl mappings in file://{destination}") + ) diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 30434cbfd..d07b6c2a8 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -52,6 +52,89 @@ def get_or_none(self, *args, **kwargs): return self.get(*args, **kwargs) +class VulnerabilityQuerySet(BaseQuerySet): + def with_cpes(self): + """ + Return a queryset of Vulnerability that have one or more NVD CPE references. + """ + return self.filter(vulnerabilityreference__reference_id__startswith="cpe") + + def for_cpe(self, cpe): + """ + Return a queryset of Vulnerability that have the ``cpe`` as an NVD CPE reference. + """ + return self.filter(vulnerabilityreference__reference_id__exact=cpe) + + def with_cves(self): + """ + Return a queryset of Vulnerability that have one or more NVD CVE aliases. + """ + return self.filter(aliases__alias__startswith="CVE") + + def for_cve(self, cve): + """ + Return a queryset of Vulnerability that have the the NVD CVE ``cve`` as an alias. + """ + return self.filter(vulnerabilityreference__reference_id__exact=cve) + + def with_packages(self): + """ + Return a queryset of Vulnerability that have one or more related packages. + """ + return self.filter(packages__isnull=False) + + def for_package(self, package): + """ + Return a queryset of Vulnerability related to ``package``. + """ + return self.filter(packages=package) + + def for_purl(self, package): + """ + Return a queryset of Vulnerability related to ``package``. + """ + return self.filter(packages=package) + + def search(self, query): + """ + Return a Vulnerability queryset searching for the ``query``. + Make a best effort approach to search a vulnerability. + """ + + qs = self + query = query and query.strip() + if not query: + return qs.none() + + # middle ground, exact on vulnerability_id + qssearch = qs.filter(vulnerability_id=query) + if not qssearch.exists(): + # middle ground, exact on alias + qssearch = qs.filter(aliases__alias=query) + if not qssearch.exists(): + # middle ground, slow enough + qssearch = qs.filter( + Q(vulnerability_id__icontains=query) | Q(aliases__alias__icontains=query) + ) + if not qssearch.exists(): + # last resort super slow + qssearch = qs.filter( + Q(references__id__icontains=query) | Q(summary__icontains=query) + ) + + return qssearch.order_by("vulnerability_id") + + def with_package_counts(self): + return self.annotate( + vulnerable_package_count=Count( + "packages", filter=Q(packagerelatedvulnerability__fix=False), distinct=True + ), + patched_package_count=Count( + "packages", filter=Q(packagerelatedvulnerability__fix=True), distinct=True + ), + ) + + class Vulnerability(models.Model): """ A software vulnerability with a unique identifier and alternate ``aliases``. @@ -80,6 +163,8 @@ class Vulnerability(models.Model): through="PackageRelatedVulnerability", ) + objects = VulnerabilityQuerySet.as_manager() + class Meta: verbose_name_plural = "Vulnerabilities" ordering = ["vulnerability_id"] @@ -87,39 +172,90 @@ class Meta: def __str__(self): return self.vulnerability_id + @property + def vcid(self): + return self.vulnerability_id + @property def severities(self): + """ + Return a queryset of VulnerabilitySeverity for this vulnerability. + """ return VulnerabilitySeverity.objects.filter(reference__in=self.references.all()) @property - def vulnerable_to(self): + def affected_packages(self): """ - Return packages that are vulnerable to this vulnerability. + Return a queryset of packages that are affected by this vulnerability. """ - return self.packages.vulnerable() + return self.packages.affected() + + # legacy aliases + vulnerable_packages = affected_packages @property - def resolved_to(self): + def fixed_by_packages(self): """ - Returns packages that first received patch against this vulnerability - in their particular version history. + Return a queryset of packages that are fixing this vulnerability. """ - return self.packages.filter(packagerelatedvulnerability__fix=True) + return self.packages.fixing() + + # legacy alias + patched_packages = fixed_by_packages @property - def alias(self): + def get_aliases(self): """ - Returns packages that first received patch against this vulnerability - in their particular version history. + Return a queryset of all Aliases for this vulnerability. """ return self.aliases.all() + alias = get_aliases + def get_absolute_url(self): """ - Return this Vulnerability details URL. + Return this Vulnerability details absolute URL. """ return reverse("vulnerability_details", args=[self.vulnerability_id]) + def get_related_cpes(self): + """ + Return a list of CPE strings of this vulnerability. + """ + return list(self.references.for_cpe().values_list("reference_id", flat=True).distinct()) + + def get_related_cves(self): + """ + Return a list of aliases CVE strings of this vulnerability. + """ + return list(self.aliases.for_cve().values_list("alias", flat=True).distinct()) + + def get_affected_purls(self): + """ + Return a list of purl strings affected by this vulnerability. + """ + return [p.package_url for p in self.affected_packages.all()] + + def get_fixing_purls(self): + """ + Return a list of purl strings fixing this vulnerability. + """ + return [p.package_url for p in self.fixed_by_packages.all()] + + def get_related_purls(self): + """ + Return a list of purl strings related to this vulnerability. + """ + return [p.package_url for p in self.packages.distinct().all()] + + +class VulnerabilityReferenceQuerySet(BaseQuerySet): + def for_cpe(self): + """ + Return a queryset of VulnerabilityReferences that are for a CPE. + """ + return self.filter(reference_id__startswith="cpe") + class VulnerabilityReference(models.Model): """ @@ -144,7 +280,7 @@ class VulnerabilityReference(models.Model): blank=True, ) - objects = BaseQuerySet.as_manager() + objects = VulnerabilityReferenceQuerySet.as_manager() class Meta: ordering = ["reference_id", "url"] @@ -153,6 +289,13 @@ def __str__(self): reference_id = f" {self.reference_id}" if self.reference_id else "" return f"{self.url}{reference_id}" + @property + def is_cpe(self): + """ + Return Trueis this is a CPE reference. + """ + return self.reference_id.startswith("cpe") + class VulnerabilityRelatedReference(models.Model): """ @@ -200,12 +343,20 @@ def for_package_url_object(self, purl): else: return self.none() - def vulnerable(self): + def affected(self): """ - Return all vulnerable packages. + Return only packages affected by a vulnerability. """ return self.filter(packagerelatedvulnerability__fix=False) + vulnerable = affected + + def fixing(self): + """ + Return only packages fixing a vulnerability . + """ + return self.filter(packagerelatedvulnerability__fix=True) + def with_vulnerability_counts(self): return self.annotate( vulnerability_count=Count( @@ -218,6 +369,110 @@ def with_vulnerability_counts(self): ), ) + def fixing_packages(self, package, with_qualifiers_and_subpath=True): + """ + Return a queryset of packages that are fixing the vulnerability of + ``package``. + """ + + return self.match_purl( + purl=package.purl, + with_qualifiers_and_subpath=with_qualifiers_and_subpath, + ).fixing() + + def search(self, query=None): + """ + Return a Package queryset searching for the ``query``. + Make a best effort approach to find matching packages either based + on exact purl, partial purl or just name and namespace. + """ + query = query and query.strip() + if not query: + return self.none() + + qs = self + if not query.startswith("pkg:"): + # treat this as a plain search + qs = qs.filter(Q(name__icontains=query) | Q(namespace__icontains=query)) + else: + # this looks like a purl: check if it quacks like a purl + purl_type = namespace = name = version = None + + _, _scheme, remainder = query.partition("pkg:") + remainder = remainder.strip() + if not remainder: + return qs.none() + + try: + # First, treat the query as a syntactically-correct purl + purl = PackageURL.from_string(query) + purl_type, namespace, name, version, _quals, _subp = purl.to_dict().values() + except ValueError: + # Otherwise, attempt a more lenient parsing of a possibly partial purl + if "/" in remainder: + purl_type, _scheme, ns_name = remainder.partition("/") + ns_name = ns_name.strip() + if ns_name: + if "/" in ns_name: + namespace, _, name = ns_name.partition("/") + else: + name = ns_name + name = name.strip() + if name: + if "@" in name: + name, _, version = name.partition("@") + version = version.strip() + name = name.strip() + else: + purl_type = remainder + + if purl_type: + qs = qs.filter(type__iexact=purl_type) + if namespace: + qs = qs.filter(namespace__iexact=namespace) + if name: + qs = qs.filter(name__iexact=name) + if version: + qs = qs.filter(version__iexact=version) + + return qs + + def for_purl(self, purl, with_qualifiers_and_subpath=True): + """ + Return a queryset matching the ``purl`` Package URL. + """ + if not isinstance(purl, PackageURL): + purl = PackageURL.from_string(purl) + purl = purl.to_dict() + if not with_qualifiers_and_subpath: + del purl["qualifiers"] + del purl["subpath"] + return self.filter(**purl) + + def with_cpes(self): + """ + Return a queryset of Package that a vulnerability with one or more NVD CPE references. + """ + return self.filter(vulnerabilities__vulnerabilityreference__reference_id__startswith="cpe") + + def for_cpe(self, cpe): + """ + Return a queryset of Vulnerability that have the ``cpe`` as an NVD CPE reference. + """ + return self.filter(vulnerabilities__vulnerabilityreference__reference_id__exact=cpe) + + def with_cves(self): + """ + Return a queryset of Vulnerability that have one or more NVD CVE aliases. + """ + return self.filter(vulnerabilities__aliases__alias__startswith="CVE") + + def for_cve(self, cve): + """ + Return a queryset of Vulnerability that have the the NVD CVE ``cve`` as an alias. + """ + return self.filter(vulnerabilities__vulnerabilityreference__reference_id__exact=cve) + def get_purl_query_lookups(purl): """ @@ -268,40 +523,39 @@ def __str__(self): @property # TODO: consider renaming to "affected_by" - def vulnerable_to(self): + def affected_by(self): """ - Returns vulnerabilities which are affecting this package. + Return a queryset of vulnerabilities affecting this package. """ return self.vulnerabilities.filter(packagerelatedvulnerability__fix=False) + # legacy aliases + vulnerable_to = affected_by + @property # TODO: consider renaming to "fixes" or "fixing" ? (TBD) and updating the docstring - def resolved_to(self): + def fixing(self): """ - Returns the vulnerabilities which this package is patched against. + Return a queryset of vulnerabilities fixed by this package. """ return self.vulnerabilities.filter(packagerelatedvulnerability__fix=True) + # legacy aliases + resolved_to = fixing + @property def fixed_packages(self): """ - Returns vulnerabilities which are affecting this package. + Return a queryset of packages that are fixed. """ - return Package.objects.filter( - name=self.name, - namespace=self.namespace, - type=self.type, - qualifiers=self.qualifiers, - subpath=self.subpath, - packagerelatedvulnerability__fix=True, - ).distinct() + return Package.objects.fixing_packages(package=self).distinct() @property def is_vulnerable(self) -> bool: """ Returns True if this package is vulnerable to any vulnerability. """ - return self.vulnerable_to.exists() + return self.affected_by.exists() def get_absolute_url(self): """ @@ -311,6 +565,9 @@ def get_absolute_url(self): class PackageRelatedVulnerability(models.Model): + """ + Track the relationship between a Package and Vulnerability. + """ # TODO: Fix related_name package = models.ForeignKey( @@ -322,6 +579,7 @@ class PackageRelatedVulnerability(models.Model): Vulnerability, on_delete=models.CASCADE, ) + created_by = models.CharField( max_length=100, blank=True, @@ -408,6 +666,14 @@ class Meta: ordering = ["reference", "scoring_system", "value"] +class AliasQuerySet(BaseQuerySet): + def for_cve(self): + """ + Return a queryset of Aliases that are for a CVE. + """ + return self.filter(alias__startswith="CVE") + + class Alias(models.Model): """ An alias is a unique vulnerability identifier in some database, such as @@ -432,6 +698,8 @@ class Alias(models.Model): related_name="aliases", ) + objects = AliasQuerySet.as_manager() + class Meta: ordering = ["alias"] diff --git a/vulnerabilities/templates/vulnerability_details.html b/vulnerabilities/templates/vulnerability_details.html index 4d1108a69..902270e0c 100644 --- a/vulnerabilities/templates/vulnerability_details.html +++ b/vulnerabilities/templates/vulnerability_details.html @@ -34,14 +34,14 @@
  • - Fixed by packages ({{ resolved_to|length }}) + Fixed by packages ({{ fixed_by_packages|length }})
  • - Affected packages ({{ vulnerable_to|length }}) + Affected packages ({{ affected_packages|length }})
  • @@ -118,11 +118,11 @@
    - Fixed by packages ({{ resolved_to|length }}) + Fixed by packages ({{ fixed_by_packages|length }})
    - {% for package in resolved_to|slice:":3" %} + {% for package in fixed_by_packages|slice:":3" %} {% endfor %} - {% if resolved_to|length > 3 %} + {% if fixed_by_packages|length > 3 %}
    {{ package.purl }} @@ -136,7 +136,7 @@
    ... see Fixed by packages tab for more @@ -147,11 +147,11 @@
    - Affected packages ({{ vulnerable_to|length }}) + Affected packages ({{ affected_packages|length }})
    - {% for package in vulnerable_to|slice:":3" %} + {% for package in affected_packages|slice:":3" %} {% endfor %} - {% if vulnerable_to|length > 3 %} + {% if affected_packages|length > 3 %} - {% for package in vulnerable_to %} + {% for package in affected_packages %} - {% for package in resolved_to %} + {% for package in fixed_by_packages %}
    {{ package.purl }} @@ -165,7 +165,7 @@
    ... see Affected packages tab for more @@ -215,7 +215,7 @@
    {{ package.purl }} @@ -244,7 +244,7 @@
    {{ package.purl }} diff --git a/vulnerabilities/tests/test_nvd.py b/vulnerabilities/tests/test_nvd.py index fad4fdce8..6b46a3c03 100644 --- a/vulnerabilities/tests/test_nvd.py +++ b/vulnerabilities/tests/test_nvd.py @@ -10,11 +10,7 @@ import json import os -from vulnerabilities.importers.nvd import extract_cpes -from vulnerabilities.importers.nvd import extract_reference_urls -from vulnerabilities.importers.nvd import extract_summary -from vulnerabilities.importers.nvd import related_to_hardware -from vulnerabilities.importers.nvd import to_advisories +from vulnerabilities.importers import nvd BASE_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA = os.path.join(BASE_DIR, "test_data/nvd/nvd_test.json") @@ -25,10 +21,24 @@ def load_test_data(): return json.load(f) -def test_nvd_importer_with_hardware(regen=False): +def sorted_advisory_data(advisory_data): + """ + Sorted nested lists in a list of AdvisoryData mappings. + """ + sorter = lambda dct: tuple(dct.items()) + for data in advisory_data: + data["aliases"] = sorted(data["aliases"]) + data["affected_packages"] = sorted(data["affected_packages"], key=sorter) + data["references"] = sorted(data["references"], key=sorter) + return advisory_data + + +def test_to_advisories_skips_hardware(regen=False): expected_file = os.path.join(BASE_DIR, "test_data/nvd/nvd-expected.json") - result = [data.to_dict() for data in list(to_advisories(load_test_data()))] + test_data = load_test_data() + result = [data.to_dict() for data in nvd.to_advisories(test_data)] + result = sorted_advisory_data(result) if regen: with open(expected_file, "w") as f: @@ -37,11 +47,13 @@ def test_nvd_importer_with_hardware(regen=False): else: with open(expected_file) as f: expected = json.load(f) + expected = sorted_advisory_data(expected) assert result == expected -def get_cve_item(): +# TODO: use a JSON fixtures instead +def get_test_cve_item(): return { "cve": { @@ -127,49 +139,38 @@ def get_cve_item(): } -def test_extract_cpes(): - expected_cpes = { +def test_CveItem_cpes(): + expected_cpes = [ "cpe:2.3:a:csilvers:gperftools:0.1:*:*:*:*:*:*:*", "cpe:2.3:a:csilvers:gperftools:0.2:*:*:*:*:*:*:*", "cpe:2.3:a:csilvers:gperftools:*:*:*:*:*:*:*:*", - } - - found_cpes = set() - found_cpes.update(extract_cpes(get_cve_item())) + ] + found_cpes = nvd.CveItem(cve_item=get_test_cve_item()).cpes assert found_cpes == expected_cpes -def test_related_to_hardware(): - assert ( - related_to_hardware( - cpes=[ - "cpe:2.3:a:csilvers:gperftools:0.1:*:*:*:*:*:*:*", - "cpe:2.3:h:csilvers:gperftools:0.2:*:*:*:*:*:*:*", - "cpe:2.3:a:csilvers:gperftools:*:*:*:*:*:*:*:*", - ] - ) - == True - ) +def test_is_related_to_hardware(): + assert nvd.is_related_to_hardware("cpe:2.3:h:csilvers:gperftools:0.2:*:*:*:*:*:*:*") + assert not nvd.is_related_to_hardware("cpe:2.3:a:csilvers:gperftools:0.1:*:*:*:*:*:*:*") + assert not nvd.is_related_to_hardware("cpe:2.3:a:csilvers:gperftools:*:*:*:*:*:*:*:*") -def test_extract_summary_with_single_summary(): +def test_CveItem_summary_with_single_summary(): expected_summary = ( "Multiple integer overflows in TCMalloc (tcmalloc.cc) in gperftools " "before 0.4 make it easier for context-dependent attackers to perform memory-related " "attacks such as buffer overflows via a large size value, which causes less memory to " "be allocated than expected." ) - found_summary = extract_summary(get_cve_item()) - assert found_summary == expected_summary + + assert nvd.CveItem(cve_item=get_test_cve_item()).summary == expected_summary -def test_extract_reference_urls(): - expected_urls = { +def test_CveItem_reference_urls(): + expected_urls = [ "http://code.google.com/p/gperftools/source/browse/tags/perftools-0.4/ChangeLog", "http://kqueue.org/blog/2012/03/05/memory-allocator-security-revisited/", - } - - found_urls = extract_reference_urls(get_cve_item()) + ] - assert found_urls == expected_urls + assert nvd.CveItem(cve_item=get_test_cve_item()).reference_urls == expected_urls diff --git a/vulnerabilities/views.py b/vulnerabilities/views.py index 6c85e5faa..6b761a9fc 100644 --- a/vulnerabilities/views.py +++ b/vulnerabilities/views.py @@ -10,8 +10,6 @@ from django.contrib import messages from django.core.exceptions import ValidationError from django.core.mail import send_mail -from django.db.models import Count -from django.db.models import Q from django.http.response import Http404 from django.shortcuts import redirect from django.shortcuts import render @@ -20,7 +18,6 @@ from django.views import generic from django.views.generic.detail import DetailView from django.views.generic.list import ListView -from packageurl import PackageURL from vulnerabilities import models from vulnerabilities.forms import ApiUserCreationForm @@ -50,67 +47,8 @@ def get_queryset(self, query=None): Make a best effort approach to find matching packages either based on exact purl, partial purl or just name and namespace. """ - qs = self.model.objects - query = query or self.request.GET.get("search") or "" - query = query.strip() - if not query: - return qs.none() - - if not query.startswith("pkg:"): - # treat this as a plain search - qs = qs.filter(Q(name__icontains=query) | Q(namespace__icontains=query)) - else: - # this looks like a purl: check if it quacks like a purl - purl_type = namespace = name = version = None - - _, _scheme, remainder = query.partition("pkg:") - remainder = remainder.strip() - if not remainder: - return qs.none() - - try: - # First, treat the query as a syntactically-correct purl - purl = PackageURL.from_string(query) - purl_type, namespace, name, version, _quals, _subp = purl.to_dict().values() - except ValueError: - # Otherwise, attempt a more lenient parsing of a possibly partial purl - if "/" in remainder: - purl_type, _scheme, ns_name = remainder.partition("/") - ns_name = ns_name.strip() - if ns_name: - if "/" in ns_name: - namespace, _, name = ns_name.partition("/") - else: - name = ns_name - name = name.strip() - if name: - if "@" in name: - name, _, version = name.partition("@") - version = version.strip() - name = name.strip() - else: - purl_type = remainder - - if purl_type: - qs = qs.filter(type__iexact=purl_type) - if namespace: - qs = qs.filter(namespace__iexact=namespace) - if name: - qs = qs.filter(name__iexact=name) - if version: - qs = qs.filter(version__iexact=version) - - return qs.annotate( - vulnerability_count=Count( - "vulnerabilities", - filter=Q(packagerelatedvulnerability__fix=False), - ), - patched_vulnerability_count=Count( - "vulnerabilities", - filter=Q(packagerelatedvulnerability__fix=True), - ), - ).prefetch_related() + return self.model.objects.search(query).with_vulnerability_counts().prefetch_related() class VulnerabilitySearch(ListView): @@ -128,35 +66,7 @@ def get_context_data(self, **kwargs): def get_queryset(self, query=None): query = query or self.request.GET.get("search") or "" - qs = self.model.objects - query = query.strip() - if not query: - return qs.none() - - # middle ground, exact on vulnerability_id - qssearch = qs.filter(vulnerability_id=query) - if not qssearch.exists(): - # middle ground, exact on alias - qssearch = qs.filter(aliases__alias=query) - if not qssearch.exists(): - # middle ground, slow enough - qssearch = qs.filter( - Q(vulnerability_id__icontains=query) | Q(aliases__alias__icontains=query) - ) - if not qssearch.exists(): - # last resort super slow - qssearch = qs.filter( - Q(references__id__icontains=query) | Q(summary__icontains=query) - ) - - return qssearch.order_by("vulnerability_id").annotate( - vulnerable_package_count=Count( - "packages", filter=Q(packagerelatedvulnerability__fix=False), distinct=True - ), - patched_package_count=Count( - "packages", filter=Q(packagerelatedvulnerability__fix=True), distinct=True - ), - ) + return self.model.objects.search(query=query).with_package_counts() class PackageDetails(DetailView): @@ -169,8 +79,8 @@ def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) package = self.object context["package"] = package - context["affected_by_vulnerabilities"] = package.vulnerable_to.order_by("vulnerability_id") - context["fixing_vulnerabilities"] = package.resolved_to.order_by("vulnerability_id") + context["affected_by_vulnerabilities"] = package.affected_by.order_by("vulnerability_id") + context["fixing_vulnerabilities"] = package.fixing.order_by("vulnerability_id") context["package_search_form"] = PackageSearchForm(self.request.GET) return context @@ -212,8 +122,8 @@ def get_context_data(self, **kwargs): "severities": list(self.object.severities), "references": self.object.references.all(), "aliases": self.object.aliases.all(), - "resolved_to": self.object.resolved_to.all(), - "vulnerable_to": self.object.vulnerable_to.all(), + "affected_packages": self.object.affected_packages.all(), + "fixed_by_packages": self.object.fixed_by_packages.all(), } ) return context @@ -240,7 +150,7 @@ def form_valid(self, form): try: response = super().form_valid(form) - except ValidationError as e: + except ValidationError: messages.error(self.request, "Email is invalid or already taken") return redirect(self.get_success_url()) diff --git a/vulnerablecode/__init__.py b/vulnerablecode/__init__.py index 1e68626f1..e415fce2e 100644 --- a/vulnerablecode/__init__.py +++ b/vulnerablecode/__init__.py @@ -12,7 +12,7 @@ import warnings from pathlib import Path -__version__ = "30.2.1" +__version__ = "30.3.0" def command_line(): diff --git a/vulnerablecode/settings.py b/vulnerablecode/settings.py index c8e9550d3..6ad02f61f 100644 --- a/vulnerablecode/settings.py +++ b/vulnerablecode/settings.py @@ -294,6 +294,11 @@ "TAGS_SORTER": False, } + +if not VULNERABLECODEIO_REQUIRE_AUTHENTICATION: + REST_FRAMEWORK["DEFAULT_PERMISSION_CLASSES"] = ("rest_framework.permissions.AllowAny",) + + if DEBUG_TOOLBAR: INSTALLED_APPS += ("debug_toolbar",) @@ -319,6 +324,3 @@ INTERNAL_IPS = [ "127.0.0.1", ] - -if not VULNERABLECODEIO_REQUIRE_AUTHENTICATION: - REST_FRAMEWORK["DEFAULT_PERMISSION_CLASSES"] = ("rest_framework.permissions.AllowAny",)