Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
from vulnerabilities.importers import nginx
from vulnerabilities.importers import nvd
from vulnerabilities.importers import openssl
from vulnerabilities.importers import redhat

IMPORTERS_REGISTRY = [
nginx.NginxImporter,
alpine_linux.AlpineImporter,
github.GitHubAPIImporter,
nvd.NVDImporter,
openssl.OpensslImporter,
redhat.RedhatImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
146 changes: 77 additions & 69 deletions vulnerabilities/importers/redhat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,81 +20,99 @@
# VulnerableCode is a free software code from nexB Inc. and others.
# Visit https://github.com/nexB/vulnerablecode/ for support and download.

import logging
from typing import Dict
from typing import Iterable
from typing import List

import requests
from packageurl import PackageURL
from univers.version_range import RpmVersionRange

from vulnerabilities import severity_systems
from vulnerabilities.helpers import nearest_patched_package
from vulnerabilities.helpers import get_item
from vulnerabilities.helpers import requests_with_5xx_retry
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.rpm_utils import rpm_to_purl


class RedhatImporter(Importer):
def __enter__(self):

self.redhat_cves = fetch()

def updated_advisories(self):
processed_advisories = list(map(to_advisory, self.redhat_cves))
return self.batch_advisories(processed_advisories)

logger = logging.getLogger(__name__)

requests_session = requests_with_5xx_retry(max_retries=5, backoff_factor=1)


def fetch():
"""
Return a list of CVE data mappings fetched from the RedHat API.
See:
https://access.redhat.com/documentation/en-us/red_hat_security_data_api/1.0/html/red_hat_security_data_api/index
"""
cves = []
def fetch_list_of_cves() -> Iterable[List[Dict]]:
page_no = 1
url_template = "https://access.redhat.com/hydra/rest/securitydata/cve.json?per_page=10000&page={}" # nopep8

cve_data = None
while True:
current_url = url_template.format(page_no)
current_url = f"https://access.redhat.com/hydra/rest/securitydata/cve.json?per_page=10000&page={page_no}" # nopep8
try:
print(f"Fetching: {current_url}")
response = requests_session.get(current_url)
if response.status_code != requests.codes.ok:
# TODO: log me
print(f"Failed to fetch results from {current_url}")
logger.error(f"Failed to fetch results from {current_url}")
break
cve_data = response.json()
except Exception as e:
# TODO: log me
msg = f"Failed to fetch results from {current_url}:\n{e}"
print(msg)
logger.error(f"Failed to fetch results from {current_url} {e}")
break

if not cve_data:
break
cves.extend(cve_data)
page_no += 1
yield cve_data


def get_bugzilla_data(bugzilla):
return requests_session.get(f"https://bugzilla.redhat.com/rest/bug/{bugzilla}").json()

return cves

def get_rhsa_data(rh_adv):
return requests_session.get(
f"https://access.redhat.com/hydra/rest/securitydata/cvrf/{rh_adv}.json"
).json()


class RedhatImporter(Importer):

spdx_license_expression = "CC-BY-4.0"
license_url = "https://access.redhat.com/documentation/en-us/red_hat_security_data_api/1.0/html/red_hat_security_data_api/legal-notice"

def advisory_data(self) -> Iterable[AdvisoryData]:
for list_of_redhat_cves in fetch_list_of_cves():
for redhat_cve in list_of_redhat_cves:
yield to_advisory(redhat_cve)


def to_advisory(advisory_data):
affected_purls = []
if advisory_data.get("affected_packages"):
for rpm in advisory_data["affected_packages"]:
purl = rpm_to_purl(rpm)
if purl:
affected_purls.append(purl)
affected_packages: List[AffectedPackage] = []
for rpm in advisory_data.get("affected_packages") or []:
purl = rpm_to_purl(rpm_string=rpm, namespace="redhat")
if purl:
try:
affected_version_range = RpmVersionRange.from_versions(sequence=[purl.version])
affected_packages.append(
AffectedPackage(
package=PackageURL(
type=purl.type,
name=purl.name,
namespace=purl.namespace,
qualifiers=purl.qualifiers,
subpath=purl.subpath,
),
affected_version_range=affected_version_range,
fixed_version=None,
)
)
except Exception as e:
logger.error(f"Failed to parse version range {purl.version} for {purl} {e}")

references = []
bugzilla = advisory_data.get("bugzilla")
if bugzilla:
url = "https://bugzilla.redhat.com/show_bug.cgi?id={}".format(bugzilla)
bugzilla_data = requests_session.get(
f"https://bugzilla.redhat.com/rest/bug/{bugzilla}"
).json()
bugzilla_data = get_bugzilla_data(bugzilla)
if (
bugzilla_data.get("bugs")
and len(bugzilla_data["bugs"])
Expand All @@ -114,25 +132,28 @@ def to_advisory(advisory_data):
)
)

for rh_adv in advisory_data["advisories"]:
for rh_adv in advisory_data.get("advisories") or []:
# RH provides 3 types of advisories RHSA, RHBA, RHEA. Only RHSA's contain severity score.
# See https://access.redhat.com/articles/2130961 for more details.

if not isinstance(rh_adv, str):
logger.error(f"Invalid advisory type {rh_adv}")
continue

if "RHSA" in rh_adv.upper():
rhsa_data = requests_session.get(
f"https://access.redhat.com/hydra/rest/securitydata/cvrf/{rh_adv}.json"
).json() # nopep8
rhsa_data = get_rhsa_data(rh_adv)

rhsa_aggregate_severities = []
if rhsa_data.get("cvrfdoc"):
# not all RHSA errata have a corresponding CVRF document
value = rhsa_data["cvrfdoc"]["aggregate_severity"]
rhsa_aggregate_severities.append(
VulnerabilitySeverity(
system=severity_systems.REDHAT_AGGREGATE,
value=value,
value = get_item(rhsa_data, "cvrfdoc", "aggregate_severity")
if value:
rhsa_aggregate_severities.append(
VulnerabilitySeverity(
system=severity_systems.REDHAT_AGGREGATE,
value=value,
)
)
)

references.append(
Reference(
Expand Down Expand Up @@ -164,27 +185,14 @@ def to_advisory(advisory_data):
)
)

aliases = []
alias = advisory_data.get("CVE")
if alias:
aliases.append(alias)
references.append(Reference(severities=redhat_scores, url=advisory_data["resource_url"]))
return AdvisoryData(
vulnerability_id=advisory_data["CVE"],
summary=advisory_data["bugzilla_description"],
affected_packages=nearest_patched_package(affected_purls, []),
aliases=aliases,
summary=advisory_data.get("bugzilla_description") or "",
affected_packages=affected_packages,
references=references,
)


def rpm_to_purl(rpm_string):
# FIXME: there is code in scancode to handle RPM conversion AND this should
# be all be part of the packageurl library

# FIXME: the comment below is not correct, this is the Epoch in the RPM version and not redhat specific
# Red Hat uses `-:0` instead of just `-` to separate
# package name and version
components = rpm_string.split("-0:")
if len(components) != 2:
return

name, version = components

if version[0].isdigit():
return PackageURL(namespace="redhat", name=name, type="rpm", version=version)
117 changes: 117 additions & 0 deletions vulnerabilities/rpm_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright (c) nexB Inc. and others. All rights reserved.
# http://nexb.com and https://github.com/nexB/vulnerablecode/
# The VulnerableCode software is licensed under the Apache License version 2.0.
# Data generated with VulnerableCode require an acknowledgment.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# When you publish or redistribute any data created with VulnerableCode or any VulnerableCode
# derivative work, you must accompany this data with the following acknowledgment:
#
# Generated with VulnerableCode and provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# VulnerableCode should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
# VulnerableCode is a free software code from nexB Inc. and others.
# Visit https://github.com/nexB/vulnerablecode/ for support and download.

import logging
import re
from collections import namedtuple

from packageurl import PackageURL

logger = logging.getLogger(__name__)

# This code has been vendored from scancode.

# https://github.com/nexB/scancode-toolkit/blob/16ae20a343c5332114edac34c7b6fcf2fb6bca74/src/packagedcode/rpm.py#L91
class EVR(namedtuple("EVR", "epoch version release")):
"""
The RPM Epoch, Version, Release tuple.
"""

def __new__(self, version, release=None, epoch=None):
"""
note: the sort order of the named tuple is the sort order.
But for creation we put the rarely used epoch last with a default to None.
"""
if not isinstance(epoch, int):
if epoch and epoch.strip():
logger.error("Invalid epoch: must be a number or empty.")
return None
if not version:
logger.error("Version is required: {}".format(repr(version)))
return None

return super().__new__(EVR, epoch, version, release)

def __str__(self, *args, **kwargs):
return self.to_string()

def to_string(self):
if self.release:
vr = f"{self.version}-{self.release}"
else:
vr = self.version

if self.epoch:
vr = ":".join([str(self.epoch), vr])
return vr


# https://github.com/nexB/scancode-toolkit/blob/16ae20a343c5332114edac34c7b6fcf2fb6bca74/src/packagedcode/nevra.py#L36
def from_name(rpm_string):
"""
Return an (E, N, V, R, A) tuple given a file name, by splitting
[e:]name-version-release.arch into the four possible subcomponents.
Default epoch, version, release and arch to None if not specified.
Accepts RPM names with and without extensions
"""
parse_nevra = re.compile("^" "(.*)" "-" "([^-]*)" "-" "([^-]*)" "\\." "([^.]*)" "$").match
m = parse_nevra(rpm_string)
if not m:
return None
n, v, r, a = m.groups()
if ":" not in v:
return None, n, v, r, a
e, v = v.split(":", 1)
if e.isdigit():
e = int(e)
return (e, n, v, r, a)


def rpm_to_purl(rpm_string, namespace):
# FIXME: there is code in scancode to handle RPM conversion AND this should
# be all be part of the packageurl library

# FIXME: the comment below is not correct, this is the Epoch in the RPM version and not redhat specific
# Red Hat uses `-:0` instead of just `-` to separate
# package name and version

# https://github.com/nexB/scancode-toolkit/blob/16ae20a343c5332114edac34c7b6fcf2fb6bca74/src/packagedcode/rpm.py#L310

envra = from_name(rpm_string)

if not envra:
logger.error(f"Invalid RPM name can't get envra: {rpm_string}")
return None
sepoch, sname, sversion, srel, sarch = envra

evr = EVR(sversion, srel, sepoch)
if not evr:
logger.error(f"Invalid RPM name can't get evr: {rpm_string}")
return None
src_evr = evr.to_string()
src_qualifiers = {}
if sarch:
src_qualifiers["arch"] = sarch

return PackageURL(
type="rpm", namespace=namespace, name=sname, version=src_evr, qualifiers=src_qualifiers
)
2 changes: 1 addition & 1 deletion vulnerabilities/templates/vulnerability.html
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ <h3>Severity</h3>
<th> Found At </th>
</tr>
{% for ref in object_list %}
{% for obj in ref.scores %}
{% for obj in ref.severities %}
<tr>
<td>{{obj.scoring_system}}</td>

Expand Down
1 change: 0 additions & 1 deletion vulnerabilities/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def no_rmtree(monkeypatch):
"test_npm.py",
"test_package_managers.py",
"test_postgresql.py",
"test_redhat_importer.py",
"test_retiredotnet.py",
"test_ruby.py",
"test_rust.py",
Expand Down
Loading