Skip to content

Modify Nginx pipeline importer to support package-first mode #1917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions vulnerabilities/pipelines/nginx_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ class NginxImporterPipeline(VulnerableCodeBaseImporterPipeline):
url = "https://nginx.org/en/security_advisories.html"
importer_name = "Nginx Importer"

is_batch_run = True

def __init__(self, *args, purl=None, **kwargs):
super().__init__(*args, **kwargs)
self.purl = purl
if self.purl:
NginxImporterPipeline.is_batch_run = False
if self.purl.type != "nginx":
print(
f"Warning: PURL type {self.purl.type} is not 'nginx', may not match any advisories"
)

@classmethod
def steps(cls):
return (
Expand All @@ -57,8 +69,46 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
soup = BeautifulSoup(self.advisory_data, features="lxml")
vulnerability_list = soup.select("li p")
for vulnerability_info in vulnerability_list:
ngnix_advisory = parse_advisory_data_from_paragraph(vulnerability_info)
yield to_advisory_data(ngnix_advisory)
nginx_advisory = parse_advisory_data_from_paragraph(vulnerability_info)
advisory_data = to_advisory_data(nginx_advisory)

if self.purl and not self._is_advisory_affecting_purl(advisory_data):
continue

yield advisory_data

def _is_advisory_affecting_purl(self, advisory_data):
if not self.purl:
return True

if self.purl.type != "nginx":
return False

for affected_package in advisory_data.affected_packages:
if affected_package.package.type != self.purl.type:
continue

if affected_package.package.name != self.purl.name:
continue

if self.purl.qualifiers and "os" in self.purl.qualifiers:
if (
not affected_package.package.qualifiers
or "os" not in affected_package.package.qualifiers
or affected_package.package.qualifiers["os"] != self.purl.qualifiers["os"]
):
continue

if self.purl.version:
purl_version = NginxVersion(self.purl.version)

affected_range = affected_package.affected_version_range
if affected_range and purl_version not in affected_range:
continue

return True

return False


class NginxAdvisory(NamedTuple):
Expand Down
39 changes: 39 additions & 0 deletions vulnerabilities/tests/pipelines/test_nginx_importer_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from bs4 import BeautifulSoup
from commoncode import testcase
from django.db.models.query import QuerySet
from packageurl import PackageURL
from univers.version_range import NginxVersionRange

from vulnerabilities import models
Expand Down Expand Up @@ -239,3 +240,41 @@ def test_NginxBasicImprover__get_inferences_from_versions_end_to_end(self):
"improver/improver-inferences-expected.json", must_exist=False
)
util_tests.check_results_against_json(results, expected_file)

def test_nginx_importer_package_first_mode_found(self):
test_file = self.get_test_loc("security_advisories.html")
with open(test_file) as tf:
test_text = tf.read()

purl = PackageURL(type="nginx", name="nginx", version="1.17.2")
pipeline = nginx_importer.NginxImporterPipeline(purl=purl)
pipeline.advisory_data = test_text
advisories = list(pipeline.collect_advisories())
expected_file = self.get_test_loc("security_advisories-single-version-expected.json")
advisories_dicts = [a.to_dict() for a in advisories]
util_tests.check_results_against_json(advisories_dicts, expected_file)

def test_nginx_importer_package_first_mode_none_found(self):
test_file = self.get_test_loc("security_advisories.html")
with open(test_file) as tf:
test_text = tf.read()

purl = PackageURL(type="nginx", name="nonexistent")
pipeline = nginx_importer.NginxImporterPipeline(purl=purl)
pipeline.advisory_data = test_text
advisories = list(pipeline.collect_advisories())
assert advisories == []

def test_nginx_importer_package_first_mode_with_os_qualifier(self):
test_file = self.get_test_loc("security_advisories.html")
with open(test_file) as tf:
test_text = tf.read()

purl = PackageURL(type="nginx", name="nginx", qualifiers={"os": "windows"})
pipeline = nginx_importer.NginxImporterPipeline(purl=purl)
pipeline.advisory_data = test_text
advisories = list(pipeline.collect_advisories())

for adv in advisories:
for pkg in adv.affected_packages:
assert pkg.package.qualifiers.get("os") == "windows"
Loading