Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a0e6be9
Merge pull request #7 from nexB/main
AmitGupta7580 Apr 4, 2021
e9172cb
Update Apache httpd importer from importing XML to JSON data
AmitGupta7580 Apr 4, 2021
4bd6a38
Add severity system and test file for apache_httpd
AmitGupta7580 Apr 5, 2021
c25b48c
Add new Severity System for Apache_httpd Severity
AmitGupta7580 Apr 6, 2021
5ac506d
Merge branch 'main' into update-apache-httpd-importer
AmitGupta7580 Apr 6, 2021
9a7c438
Merge branch 'main' into update-apache-httpd-importer
AmitGupta7580 Apr 6, 2021
7dba53f
Merge branch 'main' into update-apache-httpd-importer
AmitGupta7580 Apr 6, 2021
db9ec0d
Merge branch 'main' into update-apache-httpd-importer
AmitGupta7580 Apr 14, 2021
2bc3acc
Improve the format of apache_httpd importer
AmitGupta7580 Apr 15, 2021
05b1ef6
Merge branch 'main' into update-apache-httpd-importer
AmitGupta7580 Apr 22, 2021
a4b4633
rebased and refactored the apache_httpd importer according to the new…
AmitGupta7580 Apr 22, 2021
28f44ef
Resolve version ranges into discrete versions and fetch all released …
AmitGupta7580 Apr 25, 2021
6537967
Resolve the problem of adding format string in etag and improve the c…
AmitGupta7580 Apr 28, 2021
f165d16
Merge branch 'main' into update-apache-httpd-importer
AmitGupta7580 Apr 28, 2021
4157037
Improve data extraction from JSON data
AmitGupta7580 Apr 29, 2021
996ef17
Merge branch 'main' into update-apache-httpd-importer
AmitGupta7580 Apr 29, 2021
5057e23
Merge branch 'main' into update-apache-httpd-importer
AmitGupta7580 May 6, 2021
223cabe
Removed Etag check
AmitGupta7580 May 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 110 additions & 44 deletions vulnerabilities/importers/apache_httpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,25 @@
# VulnerableCode is a free software tool from nexB Inc. and others.
# Visit https://github.com/nexB/vulnerablecode/ for support and download.

import asyncio
import dataclasses
from xml.etree import ElementTree
import urllib

import requests
from bs4 import BeautifulSoup
from packageurl import PackageURL
from univers.versions import MavenVersion
from univers.version_specifier import VersionSpecifier

from vulnerabilities.data_source import Advisory
from vulnerabilities.data_source import DataSource
from vulnerabilities.data_source import DataSourceConfiguration
from vulnerabilities.data_source import Reference
from vulnerabilities.data_source import VulnerabilitySeverity
from vulnerabilities.package_managers import GitHubTagsAPI
from vulnerabilities.severity_systems import scoring_systems
from vulnerabilities.helpers import create_etag
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit this isn't used anymore

from vulnerabilities.helpers import nearest_patched_package


@dataclasses.dataclass
Expand All @@ -40,56 +49,113 @@ class ApacheHTTPDDataSourceConfiguration(DataSourceConfiguration):
class ApacheHTTPDDataSource(DataSource):

CONFIG_CLASS = ApacheHTTPDDataSourceConfiguration
url = "https://httpd.apache.org/security/vulnerabilities-httpd.xml"
base_url = "https://httpd.apache.org/security/json/"

def set_api(self):
self.version_api = GitHubTagsAPI()
asyncio.run(self.version_api.load_api(["apache/httpd"]))

def updated_advisories(self):
# Etags are like hashes of web responses. We maintain
# (url, etag) mappings in the DB. `create_etag` creates
# (url, etag) pair. If a (url, etag) already exists then the code
# skips processing the response further to avoid duplicate work

if create_etag(data_src=self, url=self.url, etag_key="ETag"):
data = fetch_xml(self.url)
advisories = to_advisories(data)
return self.batch_advisories(advisories)

return []


def to_advisories(data):
advisories = []
for issue in data:
resolved_packages = []
impacted_packages = []
for info in issue:
if info.tag == "cve":
cve = info.attrib["name"]

if info.tag == "title":
summary = info.text

if info.tag == "fixed":
resolved_packages.append(
PackageURL(type="apache", name="httpd", version=info.attrib["version"])
links = fetch_links(self.base_url)
self.set_api()
advisories = []
for link in links:
data = requests.get(link).json()
advisories.append(self.to_advisory(data))
return self.batch_advisories(advisories)

def to_advisory(self, data):
cve = data["CVE_data_meta"]["ID"]
descriptions = data["description"]["description_data"]
description = None
for desc in descriptions:
if desc["lang"] == "eng":
description = desc.get("value")
break

severities = []
impacts = data.get("impact", [])
for impact in impacts:
value = impact.get("other")
if value:
severities.append(
VulnerabilitySeverity(
system=scoring_systems["apache_httpd"],
value=value,
)
)
break
reference = Reference(
reference_id=cve,
url=urllib.parse.urljoin(self.base_url, f"{cve}.json"),
severities=severities,
)

if info.tag == "affects" or info.tag == "maybeaffects":
impacted_packages.append(
PackageURL(type="apache", name="httpd", version=info.attrib["version"])
)
versions_data = []
for vendor in data["affects"]["vendor"]["vendor_data"]:
for products in vendor["product"]["product_data"]:
for version_data in products["version"]["version_data"]:
versions_data.append(version_data)

advisories.append(
Advisory(
vulnerability_id=cve,
summary=summary,
impacted_package_urls=impacted_packages,
resolved_package_urls=resolved_packages,
fixed_version_ranges, affected_version_ranges = self.to_version_ranges(versions_data)

affected_packages = []
fixed_packages = []

for version_range in fixed_version_ranges:
fixed_packages.extend(
[
PackageURL(type="apache", name="httpd", version=version)
for version in self.version_api.get("apache/httpd")
if MavenVersion(version) in version_range
]
)

for version_range in affected_version_ranges:
affected_packages.extend(
[
PackageURL(type="apache", name="httpd", version=version)
for version in self.version_api.get("apache/httpd")
if MavenVersion(version) in version_range
]
)

return Advisory(
vulnerability_id=cve,
summary=description,
affected_packages=nearest_patched_package(affected_packages, fixed_packages),
references=[reference],
)

return advisories
def to_version_ranges(self, versions_data):
fixed_version_ranges = []
affected_version_ranges = []
for version_data in versions_data:
version_value = version_data["version_value"]
range_expression = version_data["version_affected"]
if range_expression == "<":
fixed_version_ranges.append(
VersionSpecifier.from_scheme_version_spec_string(
"maven", ">={}".format(version_value)
)
)
elif range_expression == "=" or range_expression == "?=":
affected_version_ranges.append(
VersionSpecifier.from_scheme_version_spec_string(
"maven", "{}".format(version_value)
)
)

return (fixed_version_ranges, affected_version_ranges)


def fetch_xml(url):
resp = requests.get(url).content
return ElementTree.fromstring(resp)
def fetch_links(url):
links = []
data = requests.get(url).content
soup = BeautifulSoup(data, features="lxml")
for tag in soup.find_all("a"):
link = tag.get("href")
if not link.endswith("json"):
continue
links.append(urllib.parse.urljoin(url, link))
return links
5 changes: 5 additions & 0 deletions vulnerabilities/severity_systems.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,9 @@ def as_score(self, value):
url="",
notes="Severity for unknown scoring systems. Contains generic textual values like High, Low etc",
),
"apache_httpd": ScoringSystem(
identifier="apache_httpd",
name="Apache Httpd Severity",
url="https://httpd.apache.org/security/impact_levels.html",
),
}
124 changes: 124 additions & 0 deletions vulnerabilities/tests/test_apache_httpd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright (c) nexB Inc. and others. All rights reserved.
# http://nexb.com and https://github.com/nexB/vulnerablecode/
# The VulnerableCode software is licensed under the Apache License version 2.0.
# Data generated with VulnerableCode require an acknowledgment.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# When you publish or redistribute any data created with VulnerableCode or any VulnerableCode
# derivative work, you must accompany this data with the following acknowledgment:
#
# Generated with VulnerableCode and provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# VulnerableCode should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
# VulnerableCode is a free software from nexB Inc. and others.
# Visit https://github.com/nexB/vulnerablecode/ for support and download.

import os
import json
from unittest import TestCase

from packageurl import PackageURL
from univers.version_specifier import VersionSpecifier

from vulnerabilities.data_source import Reference
from vulnerabilities.data_source import Advisory
from vulnerabilities.data_source import VulnerabilitySeverity
from vulnerabilities.package_managers import GitHubTagsAPI
from vulnerabilities.severity_systems import scoring_systems
from vulnerabilities.importers.apache_httpd import ApacheHTTPDDataSource
from vulnerabilities.helpers import AffectedPackage

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data", "apache_httpd", "CVE-1999-1199.json")


class TestApacheHTTPDDataSource(TestCase):
@classmethod
def setUpClass(cls):
data_source_cfg = {"etags": {}}
cls.data_src = ApacheHTTPDDataSource(1, config=data_source_cfg)
known_versions = ["1.3.2", "1.3.1", "1.3.0"]
cls.data_src.version_api = GitHubTagsAPI(cache={"apache/httpd": known_versions})
with open(TEST_DATA) as f:
cls.data = json.load(f)

def test_to_version_ranges(self):
data = [
{
"version_affected": "?=",
"version_value": "1.3.0",
},
{
"version_affected": "=",
"version_value": "1.3.1",
},
{
"version_affected": "<",
"version_value": "1.3.2",
},
]
fixed_version_ranges, affected_version_ranges = self.data_src.to_version_ranges(data)

# Check fixed packages
assert [
VersionSpecifier.from_scheme_version_spec_string("maven", ">=1.3.2")
] == fixed_version_ranges

# Check vulnerable packages
assert [
VersionSpecifier.from_scheme_version_spec_string("maven", "==1.3.0"),
VersionSpecifier.from_scheme_version_spec_string("maven", "==1.3.1"),
] == affected_version_ranges

def test_to_advisory(self):
expected_advisories = [
Advisory(
summary="A serious problem exists when a client sends a large number of "
"headers with the same header name. Apache uses up memory faster than the "
"amount of memory required to simply store the received data itself. That "
"is, memory use increases faster and faster as more headers are received, "
"rather than increasing at a constant rate. This makes a denial of service "
"attack based on this method more effective than methods which cause Apache"
" to use memory at a constant rate, since the attacker has to send less data.",
affected_packages=[
AffectedPackage(
vulnerable_package=PackageURL(
type="apache",
name="httpd",
version="1.3.0",
),
),
AffectedPackage(
vulnerable_package=PackageURL(
type="apache",
name="httpd",
version="1.3.1",
),
),
],
references=[
Reference(
url="https://httpd.apache.org/security/json/CVE-1999-1199.json",
severities=[
VulnerabilitySeverity(
system=scoring_systems["apache_httpd"],
value="important",
),
],
reference_id="CVE-1999-1199",
),
],
vulnerability_id="CVE-1999-1199",
)
]
found_advisories = [self.data_src.to_advisory(self.data)]
found_advisories = list(map(Advisory.normalized, found_advisories))
expected_advisories = list(map(Advisory.normalized, expected_advisories))
assert sorted(found_advisories) == sorted(expected_advisories)
Loading