From 1b064236c514caeb2832ac844c3be92bff0cbd6b Mon Sep 17 00:00:00 2001 From: Hritik Vijay Date: Fri, 19 Mar 2021 02:05:25 +0530 Subject: [PATCH 1/2] Make sure vulnerability id is_cve or is_vulcoid This makes sure that vulnerability id supplied in alpine_linux importer is either a cve, vulcoid or empty so as to stand on the definition of vulnerability id. It could be possible to introduce a validator at the model level for the same as well using these functions Signed-off-by: Hritik Vijay --- vulnerabilities/data_source.py | 20 ++++++++++++++++++++ vulnerabilities/importers/alpine_linux.py | 4 +++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/vulnerabilities/data_source.py b/vulnerabilities/data_source.py index 28895a9cb..54db90368 100644 --- a/vulnerabilities/data_source.py +++ b/vulnerabilities/data_source.py @@ -228,6 +228,26 @@ def batch_advisories(self, advisories: List[Advisory]) -> Set[Advisory]: b, advisories = advisories[: self.batch_size], advisories[self.batch_size :] yield b + @staticmethod + def is_cve(id: str): + c = id.split("-") + if len(c) == 3 and c[0].lower() == "cve" and c[1].isdigit() and c[2].isdigit(): + return True + return False + + @staticmethod + def is_vulcoid(id: str): + c = id.split("-") + if ( + len(c) == 4 + and c[0].lower() == "vulcoid" + and c[1].isdigit() + and c[2].isdigit() + and c[3].isdigit() + ): + return True + return False + @dataclasses.dataclass class GitDataSourceConfiguration(DataSourceConfiguration): diff --git a/vulnerabilities/importers/alpine_linux.py b/vulnerabilities/importers/alpine_linux.py index d3831f47c..f114b6edc 100644 --- a/vulnerabilities/importers/alpine_linux.py +++ b/vulnerabilities/importers/alpine_linux.py @@ -193,7 +193,9 @@ def _load_advisories( impacted_package_urls=[], resolved_package_urls=resolved_purls, references=references, - vulnerability_id=vuln_ids[0] if vuln_ids[0] != "CVE-????-?????" else "", + vulnerability_id=vuln_ids[0] + if self.is_cve(vuln_ids[0]) or self.is_vulcoid(vuln_ids[0]) + else "", ) ) From 4d020a7241654e785df1738ce04ac1700180553a Mon Sep 17 00:00:00 2001 From: Hritik Vijay Date: Fri, 19 Mar 2021 16:15:19 +0530 Subject: [PATCH 2/2] Use is_cve helper to validate vulnerablitiy id Signed-off-by: Hritik Vijay --- vulnerabilities/data_source.py | 25 ++++------------ vulnerabilities/importers/alpine_linux.py | 5 ++-- vulnerabilities/tests/test_import_runner.py | 32 ++++++++++----------- 3 files changed, 23 insertions(+), 39 deletions(-) diff --git a/vulnerabilities/data_source.py b/vulnerabilities/data_source.py index 54db90368..c911518ec 100644 --- a/vulnerabilities/data_source.py +++ b/vulnerabilities/data_source.py @@ -44,6 +44,7 @@ from vulnerabilities.oval_parser import OvalParser from vulnerabilities.severity_systems import ScoringSystem +from vulnerabilities.helpers import is_cve logger = logging.getLogger(__name__) @@ -88,6 +89,10 @@ class Advisory: resolved_package_urls: Iterable[PackageURL] = dataclasses.field(default_factory=list) references: List[Reference] = dataclasses.field(default_factory=list) + def __post_init__(self): + if self.vulnerability_id and not is_cve(self.vulnerability_id): + raise ValueError("CVE expected, found: {}".format(self.vulnerability_id)) + def normalized(self): impacted_package_urls = {package_url for package_url in self.impacted_package_urls} resolved_package_urls = {package_url for package_url in self.resolved_package_urls} @@ -228,26 +233,6 @@ def batch_advisories(self, advisories: List[Advisory]) -> Set[Advisory]: b, advisories = advisories[: self.batch_size], advisories[self.batch_size :] yield b - @staticmethod - def is_cve(id: str): - c = id.split("-") - if len(c) == 3 and c[0].lower() == "cve" and c[1].isdigit() and c[2].isdigit(): - return True - return False - - @staticmethod - def is_vulcoid(id: str): - c = id.split("-") - if ( - len(c) == 4 - and c[0].lower() == "vulcoid" - and c[1].isdigit() - and c[2].isdigit() - and c[3].isdigit() - ): - return True - return False - @dataclasses.dataclass class GitDataSourceConfiguration(DataSourceConfiguration): diff --git a/vulnerabilities/importers/alpine_linux.py b/vulnerabilities/importers/alpine_linux.py index f114b6edc..cd222ec46 100644 --- a/vulnerabilities/importers/alpine_linux.py +++ b/vulnerabilities/importers/alpine_linux.py @@ -38,6 +38,7 @@ from vulnerabilities.data_source import Advisory from vulnerabilities.data_source import DataSource from vulnerabilities.data_source import Reference +from vulnerabilities.helpers import is_cve BASE_URL = "https://secdb.alpinelinux.org/" @@ -193,9 +194,7 @@ def _load_advisories( impacted_package_urls=[], resolved_package_urls=resolved_purls, references=references, - vulnerability_id=vuln_ids[0] - if self.is_cve(vuln_ids[0]) or self.is_vulcoid(vuln_ids[0]) - else "", + vulnerability_id=vuln_ids[0] if is_cve(vuln_ids[0]) else "", ) ) diff --git a/vulnerabilities/tests/test_import_runner.py b/vulnerabilities/tests/test_import_runner.py index ee6da69e9..efda749b2 100644 --- a/vulnerabilities/tests/test_import_runner.py +++ b/vulnerabilities/tests/test_import_runner.py @@ -69,9 +69,9 @@ def save(self): ADVISORIES = [ Advisory( - vulnerability_id="MOCK-CVE-2020-1337", + vulnerability_id="CVE-2020-13371337", summary="vulnerability description here", - references=[Reference(url="https://example.com/with/more/info/MOCK-CVE-2020-1337")], + references=[Reference(url="https://example.com/with/more/info/CVE-2020-13371337")], impacted_package_urls=[PackageURL(name="mock-webserver", type="pypi", version="1.2.33")], resolved_package_urls=[PackageURL(name="mock-webserver", type="pypi", version="1.2.34")], ) @@ -113,11 +113,11 @@ def test_ImportRunner_new_package_and_new_vulnerability(db): assert resolved_package.vulnerabilities.count() == 1 vuln = impacted_package.vulnerabilities.first() - assert vuln.vulnerability_id == "MOCK-CVE-2020-1337" + assert vuln.vulnerability_id == "CVE-2020-13371337" vuln_refs = models.VulnerabilityReference.objects.filter(vulnerability=vuln) assert vuln_refs.count() == 1 - assert vuln_refs[0].url == "https://example.com/with/more/info/MOCK-CVE-2020-1337" + assert vuln_refs[0].url == "https://example.com/with/more/info/CVE-2020-13371337" def test_ImportRunner_existing_package_and_new_vulnerability(db): @@ -145,11 +145,11 @@ def test_ImportRunner_existing_package_and_new_vulnerability(db): impacted_package = models.PackageRelatedVulnerability.objects.filter(is_vulnerable=True)[0] vuln = impacted_package.vulnerability - assert vuln.vulnerability_id == "MOCK-CVE-2020-1337" + assert vuln.vulnerability_id == "CVE-2020-13371337" vuln_refs = models.VulnerabilityReference.objects.filter(vulnerability=vuln) assert vuln_refs.count() == 1 - assert vuln_refs[0].url == "https://example.com/with/more/info/MOCK-CVE-2020-1337" + assert vuln_refs[0].url == "https://example.com/with/more/info/CVE-2020-13371337" def test_ImportRunner_new_package_version_affected_by_existing_vulnerability(db): @@ -158,11 +158,11 @@ def test_ImportRunner_new_package_version_affected_by_existing_vulnerability(db) vulnerability that also already existed in the database. """ vuln = models.Vulnerability.objects.create( - vulnerability_id="MOCK-CVE-2020-1337", summary="vulnerability description here" + vulnerability_id="CVE-2020-13371337", summary="vulnerability description here" ) models.VulnerabilityReference.objects.create( - vulnerability=vuln, url="https://example.com/with/more/info/MOCK-CVE-2020-1337" + vulnerability=vuln, url="https://example.com/with/more/info/CVE-2020-13371337" ) models.PackageRelatedVulnerability.objects.create( vulnerability=vuln, @@ -200,7 +200,7 @@ def test_ImportRunner_new_package_version_affected_by_existing_vulnerability(db) ) assert len(qs) == 1 impacted_package = qs[0] - assert impacted_package.vulnerability.vulnerability_id == "MOCK-CVE-2020-1337" + assert impacted_package.vulnerability.vulnerability_id == "CVE-2020-13371337" # def test_ImportRunner_assumed_fixed_package_is_updated_as_impacted(db): @@ -213,11 +213,11 @@ def test_ImportRunner_new_package_version_affected_by_existing_vulnerability(db) # FIXME deleted, the referenced Package and Vulnerability are also deleted. # # vuln = models.Vulnerability.objects.create( -# vulnerability_id='MOCK-CVE-2020-1337', summary='vulnerability description here') +# vulnerability_id='CVE-2020-13371337', summary='vulnerability description here') # # models.VulnerabilityReference.objects.create( # vulnerability=vuln, -# url='https://example.com/with/more/info/MOCK-CVE-2020-1337' +# url='https://example.com/with/more/info/CVE-2020-13371337' # ) # # misclassified_package = models.Package.objects.create( @@ -255,11 +255,11 @@ def test_ImportRunner_fixed_package_version_is_added(db): A new version of a package was published that fixes a previously unresolved vulnerability. """ vuln = models.Vulnerability.objects.create( - vulnerability_id="MOCK-CVE-2020-1337", summary="vulnerability description here" + vulnerability_id="CVE-2020-13371337", summary="vulnerability description here" ) models.VulnerabilityReference.objects.create( - vulnerability=vuln, url="https://example.com/with/more/info/MOCK-CVE-2020-1337" + vulnerability=vuln, url="https://example.com/with/more/info/CVE-2020-13371337" ) models.PackageRelatedVulnerability.objects.create( vulnerability=vuln, @@ -288,7 +288,7 @@ def test_ImportRunner_fixed_package_version_is_added(db): ) assert len(qs) == 1 resolved_package = qs[0] - assert resolved_package.vulnerability.vulnerability_id == "MOCK-CVE-2020-1337" + assert resolved_package.vulnerability.vulnerability_id == "CVE-2020-13371337" def test_ImportRunner_updated_vulnerability(db): @@ -297,7 +297,7 @@ def test_ImportRunner_updated_vulnerability(db): reference. """ vuln = models.Vulnerability.objects.create( - vulnerability_id="MOCK-CVE-2020-1337", summary="temporary description" + vulnerability_id="CVE-2020-13371337", summary="temporary description" ) models.PackageRelatedVulnerability.objects.create( @@ -326,4 +326,4 @@ def test_ImportRunner_updated_vulnerability(db): vuln_refs = models.VulnerabilityReference.objects.filter(vulnerability=vuln) assert vuln_refs.count() == 1 - assert vuln_refs[0].url == "https://example.com/with/more/info/MOCK-CVE-2020-1337" + assert vuln_refs[0].url == "https://example.com/with/more/info/CVE-2020-13371337"