Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions vulnerabilities/importer_yielder.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@
'debian_tracker_url': 'https://security-tracker.debian.org/tracker/data/json'
},
},
# {
# 'name': 'safetydb',
# 'license': 'cc-by-nc-4.0',
# 'last_run': None,
# 'data_source': 'SafetyDbDataSource',
# 'data_source_cfg': {
# 'url': 'https://raw.githubusercontent.com/pyupio/safety-db/master/data/insecure_full.json', # nopep8
# 'etags': {}
# },
# },
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have a way to selectively enable some non-free resources like this one. As long as this is under a cc-by-nc license we cannot integrate these data by default IMHO
@jayfk ping?

'name': 'safetydb',
'license': 'cc-by-nc-4.0',
'last_run': None,
'data_source': 'SafetyDbDataSource',
'data_source_cfg': {
'url': 'https://raw.githubusercontent.com/pyupio/safety-db/master/data/insecure_full.json', # nopep8
'etags': {}
},
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add instead a flag to disable this rather than commenting it out and leave this disabled.

{
'name': 'npm',
'license': 'mit',
Expand Down
73 changes: 44 additions & 29 deletions vulnerabilities/importers/safety_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@

import asyncio
import dataclasses
import json
import re
import logging
from typing import Any
from typing import Iterable
from typing import Mapping
from typing import Set
from typing import Tuple
from urllib.error import HTTPError
from urllib.request import urlopen
import requests

import requests
from dephell_specifier import RangeSpecifier
from packageurl import PackageURL
from schema import Or
Expand All @@ -47,20 +46,20 @@
from vulnerabilities.data_source import Reference
from vulnerabilities.package_managers import PypiVersionAPI

logger = logging.getLogger(__name__)


def validate_schema(advisory_dict):

scheme = {
str: [
{
"advisory": str,
"cve": Or(None, Regex(r"CVE-\d+-\d+")),
"id": Regex(r"^pyup.io-\d"),
"specs": list,
"v": str,
}
]
}
scheme = [
{
"advisory": str,
"cve": Or(None, str),
"id": Regex(r"^pyup.io-\d"),
"specs": list,
"v": str,
}
]

Schema(scheme).validate(advisory_dict)

Expand All @@ -78,7 +77,6 @@ class SafetyDbDataSource(DataSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._api_response = self._fetch()
validate_schema(self._api_response)

def __enter__(self):
self._versions = PypiVersionAPI()
Expand All @@ -93,9 +91,7 @@ def set_api(self, packages):

def _fetch(self) -> Mapping[str, Any]:
if self.create_etag(self.config.url):
with urlopen(self.config.url) as response:
return json.load(response)

return requests.get(self.config.url).json()
return []

def collect_packages(self):
Expand All @@ -105,22 +101,33 @@ def updated_advisories(self) -> Set[Advisory]:
advisories = []

for package_name in self._api_response:
if package_name == "$meta":
# This is the first entry in the data feed. It contains metadata of the feed.
# Skip it.
continue

try:
validate_schema(self._api_response[package_name])

except Exception as e:
logger.error(e)
continue

all_package_versions = self.versions.get(package_name)
if len(all_package_versions) == 0:
if not len(all_package_versions):
# PyPi does not have data about this package, we skip these
continue

for advisory in self._api_response[package_name]:

impacted_purls, resolved_purls = categorize_versions(
package_name, all_package_versions, advisory["specs"]
)

cve_ids = advisory.get("cve") or [""]

# meaning if cve_ids is not [''] but either ['CVE-123'] or ['CVE-123, CVE-124']
if len(cve_ids[0]):
cve_ids = [s.strip() for s in cve_ids.split(",")]
if advisory["cve"]:
# Check on advisory["cve"] instead of using `get` because it can have null value
cve_ids = re.findall(r"CVE-\d+-\d+", advisory["cve"])
else:
cve_ids = [None]

reference = [Reference(reference_id=advisory["id"])]

Expand All @@ -138,7 +145,7 @@ def updated_advisories(self) -> Set[Advisory]:
return self.batch_advisories(advisories)

def create_etag(self, url):
etag = requests.head(url).headers.get('ETag')
etag = requests.head(url).headers.get("ETag")
if not etag:
# Kind of inaccurate to return True since etag is
# not created
Expand All @@ -151,7 +158,9 @@ def create_etag(self, url):


def categorize_versions(
package_name: str, all_versions: Set[str], version_specs: Iterable[str],
package_name: str,
all_versions: Set[str],
version_specs: Iterable[str],
) -> Tuple[Set[PackageURL], Set[PackageURL]]:
"""
:return: impacted, resolved purls
Expand All @@ -163,7 +172,13 @@ def categorize_versions(
if any([version in r for r in ranges]):
impacted_versions.add(version)

impacted_purls.add(PackageURL(name=package_name, type="pypi", version=version,))
impacted_purls.add(
PackageURL(
name=package_name,
type="pypi",
version=version,
)
)

resolved_purls = set()
for version in all_versions - impacted_versions:
Expand Down
2 changes: 1 addition & 1 deletion vulnerabilities/package_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# VulnerableCode should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
# VulnerableCode is a free software code scanning tool from nexB Inc. and others.
# VulnerableCode is free software from nexB Inc. and others.
# Visit https://github.com/nexB/vulnerablecode/ for support and download.

import asyncio
Expand Down