From 17c386c22ac272dcef1b244f106fa512ccb8b617 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Sun, 15 Jun 2025 03:59:55 +0300 Subject: [PATCH 1/2] Extract API logic to utils from Gitlab datasource #1903 * Separate Gitlab API handling logic from vulntotal Gitlab datasource to utils file Signed-off-by: Michael Ehab Mikhail --- vulntotal/datasources/gitlab.py | 47 ++++----------- vulntotal/datasources/gitlab_api.py | 56 ++++++++++++++++++ .../gitlab/parsed_advisory-expected.json | 59 ++++++------------- vulntotal/tests/test_gitlab.py | 37 ++++++++---- 4 files changed, 111 insertions(+), 88 deletions(-) create mode 100644 vulntotal/datasources/gitlab_api.py diff --git a/vulntotal/datasources/gitlab.py b/vulntotal/datasources/gitlab.py index dbf84dce7..f9a3b6944 100644 --- a/vulntotal/datasources/gitlab.py +++ b/vulntotal/datasources/gitlab.py @@ -19,6 +19,8 @@ from fetchcode import fetch from packageurl import PackageURL +from vulntotal.datasources.gitlab_api import fetch_gitlab_advisories_for_purl +from vulntotal.datasources.gitlab_api import fetch_yaml from vulntotal.validator import DataSource from vulntotal.validator import VendorData from vulntotal.vulntotal_utils import gitlab_constraints_satisfied @@ -40,18 +42,12 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]: Yields: VendorData instance containing the advisory information for the package. """ - package_slug = get_package_slug(purl) - directory_files = fetch_directory_contents(package_slug) - if not directory_files: - path = self.supported_ecosystem()[purl.type] - casesensitive_package_slug = get_casesensitive_slug(path, package_slug) - directory_files = fetch_directory_contents(casesensitive_package_slug) + advisories = fetch_gitlab_advisories_for_purl( + purl, self.supported_ecosystem(), get_casesensitive_slug + ) - if directory_files: - yml_files = [file for file in directory_files if file["name"].endswith(".yml")] - - interesting_advisories = parse_interesting_advisories(yml_files, purl) - return interesting_advisories + if advisories: + return parse_interesting_advisories(advisories, purl) @classmethod def supported_ecosystem(cls): @@ -67,21 +63,6 @@ def supported_ecosystem(cls): } -def fetch_directory_contents(package_slug): - url = f"https://gitlab.com/api/v4/projects/12006272/repository/tree?path={package_slug}" - response = requests.get(url) - if response.status_code == 200: - return response.json() - - -def fetch_yaml(file_path): - response = requests.get( - f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/raw/master/{file_path}" - ) - if response.status_code == 200: - return response.text - - def get_package_slug(purl): """ Constructs a package slug from a given purl. @@ -163,12 +144,12 @@ def get_casesensitive_slug(path, package_slug): has_next = paginated_tree["pageInfo"]["hasNextPage"] -def parse_interesting_advisories(yml_files, purl) -> Iterable[VendorData]: +def parse_interesting_advisories(advisories, purl) -> Iterable[VendorData]: """ Parses advisories from YAML files in a given location that match a given version. Parameters: - yml_files: An array having the paths of yml files to parse. + advisories: A list of advisory dictionaries fetched from the GitLab API. purl: PURL for the advisory. Yields: @@ -176,14 +157,12 @@ def parse_interesting_advisories(yml_files, purl) -> Iterable[VendorData]: """ version = purl.version - for file in yml_files: - yml_data = fetch_yaml(file["path"]) - gitlab_advisory = saneyaml.load(yml_data) - affected_range = gitlab_advisory["affected_range"] + for advisory in advisories: + affected_range = advisory.get("affected_range") if gitlab_constraints_satisfied(affected_range, version): yield VendorData( purl=PackageURL(purl.type, purl.namespace, purl.name), - aliases=gitlab_advisory["identifiers"], + aliases=advisory.get("identifiers", []), affected_versions=[affected_range], - fixed_versions=gitlab_advisory["fixed_versions"], + fixed_versions=advisory.get("fixed_versions", []), ) diff --git a/vulntotal/datasources/gitlab_api.py b/vulntotal/datasources/gitlab_api.py new file mode 100644 index 000000000..278c51138 --- /dev/null +++ b/vulntotal/datasources/gitlab_api.py @@ -0,0 +1,56 @@ +import requests +import saneyaml + + +def fetch_directory_contents(package_slug): + url = f"https://gitlab.com/api/v4/projects/12006272/repository/tree?path={package_slug}" + response = requests.get(url) + if response.status_code == 200: + return response.json() + return [] + + +def fetch_yaml(file_path): + response = requests.get( + f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/raw/master/{file_path}" + ) + if response.status_code == 200: + return response.text + return None + + +def get_package_slug(purl, supported_ecosystem): + if purl.type not in supported_ecosystem: + return + ecosystem = supported_ecosystem[purl.type] + package_name = purl.name + if purl.type in ("maven", "composer", "golang"): + package_name = f"{purl.namespace}/{purl.name}" + return f"{ecosystem}/{package_name}" + + +def get_directory_yml_files(purl, supported_ecosystem, get_casesensitive_slug): + package_slug = get_package_slug(purl, supported_ecosystem) + directory_files = fetch_directory_contents(package_slug) + if not directory_files: + path = supported_ecosystem[purl.type] + casesensitive_package_slug = get_casesensitive_slug(path, package_slug) + directory_files = fetch_directory_contents(casesensitive_package_slug) + if not directory_files: + return [] + return [file for file in directory_files if file["name"].endswith(".yml")] + + +def fetch_gitlab_advisories_for_purl(purl, supported_ecosystem, get_casesensitive_slug): + yml_files = get_directory_yml_files(purl, supported_ecosystem, get_casesensitive_slug) + + advisories = [] + for file in yml_files: + yml_data = fetch_yaml(file["path"]) + if yml_data: + advisories.append(saneyaml.load(yml_data)) + return advisories + + +def get_estimated_advisories_count(purl, supported_ecosystem, get_casesensitive_slug): + return len(get_directory_yml_files(purl, supported_ecosystem, get_casesensitive_slug)) diff --git a/vulntotal/tests/test_data/gitlab/parsed_advisory-expected.json b/vulntotal/tests/test_data/gitlab/parsed_advisory-expected.json index 1a4e4a024..6391ed740 100644 --- a/vulntotal/tests/test_data/gitlab/parsed_advisory-expected.json +++ b/vulntotal/tests/test_data/gitlab/parsed_advisory-expected.json @@ -1,51 +1,26 @@ [ { - "purl": "pkg:generic/namespace/test", - "affected_versions": [ - "<=2.7.1" - ], - "fixed_versions": [ - "2.7.2" - ], - "aliases": [ - "CVE-2014-1402" - ] + "purl": "pkg:pypi/namespace/test", + "affected_versions": ["<=2.7.1"], + "fixed_versions": ["2.7.2"], + "aliases": ["CVE-2014-1402"] }, { - "purl": "pkg:generic/namespace/test", - "affected_versions": [ - "<2.8.1" - ], - "fixed_versions": [ - "2.8.1" - ], - "aliases": [ - "GHSA-hj2j-77xm-mc5v", - "CVE-2016-10745" - ] + "purl": "pkg:pypi/namespace/test", + "affected_versions": ["<2.8.1"], + "fixed_versions": ["2.8.1"], + "aliases": ["GHSA-hj2j-77xm-mc5v", "CVE-2016-10745"] }, { - "purl": "pkg:generic/namespace/test", - "affected_versions": [ - "<2.10.1" - ], - "fixed_versions": [ - "2.10.1" - ], - "aliases": [ - "CVE-2019-10906" - ] + "purl": "pkg:pypi/namespace/test", + "affected_versions": ["<2.10.1"], + "fixed_versions": ["2.10.1"], + "aliases": ["CVE-2019-10906"] }, { - "purl": "pkg:generic/namespace/test", - "affected_versions": [ - "<2.11.3" - ], - "fixed_versions": [ - "2.11.3" - ], - "aliases": [ - "CVE-2020-28493" - ] + "purl": "pkg:pypi/namespace/test", + "affected_versions": ["<2.11.3"], + "fixed_versions": ["2.11.3"], + "aliases": ["CVE-2020-28493"] } -] \ No newline at end of file +] diff --git a/vulntotal/tests/test_gitlab.py b/vulntotal/tests/test_gitlab.py index 2870e81dc..3f8e993a4 100644 --- a/vulntotal/tests/test_gitlab.py +++ b/vulntotal/tests/test_gitlab.py @@ -14,6 +14,7 @@ from vulnerabilities.tests import util_tests from vulntotal.datasources import gitlab +from vulntotal.datasources import gitlab_api class TestGitlab(testcase.FileBasedTesting): @@ -28,12 +29,26 @@ def test_generate_package_advisory_url(self): "pkg:composer/bolt/core@0.1", "pkg:nuget/moment.js@2.18.0", ] - results = [gitlab.get_package_slug(PackageURL.from_string(purl)) for purl in purls] + supported_ecosystem = gitlab.GitlabDataSource.supported_ecosystem() + results = [ + gitlab_api.get_package_slug(PackageURL.from_string(purl), supported_ecosystem) + for purl in purls + ] expected_file = self.get_test_loc("package_advisory_url-expected.json", must_exist=False) util_tests.check_results_against_json(results, expected_file) - @mock.patch("vulntotal.datasources.gitlab.fetch_yaml") - def test_parse_interesting_advisories(self, mock_fetch_yaml): + @mock.patch("vulntotal.datasources.gitlab_api.fetch_yaml") + @mock.patch("vulntotal.datasources.gitlab_api.fetch_directory_contents") + def test_parse_interesting_advisories(self, mock_fetch_directory_contents, mock_fetch_yaml): + # Mock the directory contents response + mock_fetch_directory_contents.return_value = [ + {"name": "CVE-2014-1402.yml", "path": "path/to/CVE-2014-1402.yml"}, + {"name": "CVE-2016-10745.yml", "path": "path/to/CVE-2016-10745.yml"}, + {"name": "CVE-2019-10906.yml", "path": "path/to/CVE-2019-10906.yml"}, + {"name": "CVE-2019-8341.yml", "path": "path/to/CVE-2019-8341.yml"}, + {"name": "CVE-2020-28493.yml", "path": "path/to/CVE-2020-28493.yml"}, + ] + # Mock the yaml file responses advisory_folder = ( Path(__file__) @@ -51,17 +66,15 @@ def test_parse_interesting_advisories(self, mock_fetch_yaml): mock_fetch_yaml.side_effect = yaml_files - purl = PackageURL("generic", "namespace", "test", "0.1.1") + purl = PackageURL("pypi", "namespace", "test", "0.1.1") - yml_files = [ - {"name": "CVE-2014-1402.yml", "path": "path/to/CVE-2014-1402.yml"}, - {"name": "CVE-2016-10745.yml", "path": "path/to/CVE-2016-10745.yml"}, - {"name": "CVE-2019-10906.yml", "path": "path/to/CVE-2019-10906.yml"}, - {"name": "CVE-2019-8341.yml", "path": "path/to/CVE-2019-8341.yml"}, - {"name": "CVE-2020-28493.yml", "path": "path/to/CVE-2020-28493.yml"}, - ] + supported_ecosystem = gitlab.GitlabDataSource.supported_ecosystem() + + advisories = gitlab_api.fetch_gitlab_advisories_for_purl( + purl, supported_ecosystem, gitlab.get_casesensitive_slug + ) - results = [adv.to_dict() for adv in gitlab.parse_interesting_advisories(yml_files, purl)] + results = [adv.to_dict() for adv in gitlab.parse_interesting_advisories(advisories, purl)] expected_file = self.get_test_loc("parsed_advisory-expected.json", must_exist=False) util_tests.check_results_against_json(results, expected_file) From fb49371ff40ff3daaf765530b0fa1e586911808a Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Sun, 15 Jun 2025 04:01:18 +0300 Subject: [PATCH 2/2] Modify Gitlab importer to support package-first mode #1903 * Use the GitLab API utils from vulntotal to support package-first mode. Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/pipelines/gitlab_importer.py | 274 ++++++++++++------ .../test_gitlab_importer_pipeline.py | 27 ++ .../gitlab/pypi-single-mode-expected.json | 35 +++ 3 files changed, 251 insertions(+), 85 deletions(-) create mode 100644 vulnerabilities/tests/test_data/gitlab/pypi-single-mode-expected.json diff --git a/vulnerabilities/pipelines/gitlab_importer.py b/vulnerabilities/pipelines/gitlab_importer.py index 4f25c4d94..d2ad1186e 100644 --- a/vulnerabilities/pipelines/gitlab_importer.py +++ b/vulnerabilities/pipelines/gitlab_importer.py @@ -13,6 +13,7 @@ from typing import Iterable from typing import List from typing import Tuple +from urllib.parse import urljoin import pytz import saneyaml @@ -31,6 +32,9 @@ from vulnerabilities.utils import build_description from vulnerabilities.utils import get_advisory_url from vulnerabilities.utils import get_cwe_id +from vulntotal.datasources.gitlab import get_casesensitive_slug +from vulntotal.datasources.gitlab_api import fetch_gitlab_advisories_for_purl +from vulntotal.datasources.gitlab_api import get_estimated_advisories_count class GitLabImporterPipeline(VulnerableCodeBaseImporterPipeline): @@ -42,9 +46,16 @@ class GitLabImporterPipeline(VulnerableCodeBaseImporterPipeline): license_url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/LICENSE" importer_name = "GitLab Importer" repo_url = "git+https://gitlab.com/gitlab-org/advisories-community/" + is_batch_run = True @classmethod def steps(cls): + if not cls.is_batch_run: + return ( + cls.collect_and_store_advisories, + cls.import_new_advisories, + ) + return ( cls.clone, cls.collect_and_store_advisories, @@ -66,15 +77,57 @@ def steps(cls): gitlab_scheme_by_purl_type = {v: k for k, v in purl_type_by_gitlab_scheme.items()} + def __init__(self, *args, purl=None, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + # If a purl is provided, we are running in package-first mode + if self.purl: + GitLabImporterPipeline.is_batch_run = False + def clone(self): self.log(f"Cloning `{self.repo_url}`") self.vcs_response = fetch_via_vcs(self.repo_url) def advisories_count(self): - root = Path(self.vcs_response.dest_dir) - return sum(1 for _ in root.rglob("*.yml")) + if GitLabImporterPipeline.is_batch_run: + root = Path(self.vcs_response.dest_dir) + return sum(1 for _ in root.rglob("*.yml")) + else: + return get_estimated_advisories_count( + self.purl, self.purl_type_by_gitlab_scheme, get_casesensitive_slug + ) def collect_advisories(self) -> Iterable[AdvisoryData]: + if not self.is_batch_run: + advisories = fetch_gitlab_advisories_for_purl( + self.purl, self.purl_type_by_gitlab_scheme, get_casesensitive_slug + ) + + input_version = self.purl.version + vrc = RANGE_CLASS_BY_SCHEMES[self.purl.type] + version_obj = vrc.version_class(input_version) if input_version else None + + for advisory in advisories: + advisory_data = self._advisory_dict_to_advisory_data(advisory) + # If purl has version, we need to check if advisory affects the version + if input_version: + affected = False + for affected_package in advisory_data.affected_packages: + vrange = affected_package.affected_version_range + fixed_version = affected_package.fixed_version + if vrange and version_obj in vrange: + if fixed_version: + fixed_version_obj = vrc.version_class(str(fixed_version)) + if version_obj >= fixed_version_obj: + continue + affected = True + break + if affected: + yield advisory_data + else: + yield advisory_data + return + base_path = Path(self.vcs_response.dest_dir) for file_path in base_path.rglob("*.yml"): @@ -109,6 +162,135 @@ def clean_downloads(self): def on_failure(self): self.clean_downloads() + def _advisory_dict_to_advisory_data(self, advisory): + return advisory_dict_to_advisory_data( + advisory=advisory, + purl_type_by_gitlab_scheme=self.purl_type_by_gitlab_scheme, + gitlab_scheme_by_purl_type=self.gitlab_scheme_by_purl_type, + logger=self.log, + purl=self.purl, + ) + + +def advisory_dict_to_advisory_data( + advisory: dict, + purl_type_by_gitlab_scheme, + gitlab_scheme_by_purl_type, + logger, + purl=None, + advisory_url=None, +): + """ + Convert a GitLab advisory dict to AdvisoryData. + """ + aliases = advisory.get("identifiers", []) + identifier = advisory.get("identifier", "") + summary = build_description(advisory.get("title"), advisory.get("description")) + urls = advisory.get("urls", []) + references = [Reference.from_url(u) for u in urls] + + cwe_ids = advisory.get("cwe_ids") or [] + cwe_list = list(map(get_cwe_id, cwe_ids)) + + date_published = dateparser.parse(advisory.get("pubdate")) + date_published = date_published.replace(tzinfo=pytz.UTC) + + package_slug = advisory.get("package_slug") + + # Determine purl if not provided + if not purl: + purl = get_purl( + package_slug=package_slug, + purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme, + logger=logger, + ) + + if not purl: + logger( + f"advisory_dict_to_advisory_data: purl is not valid: {package_slug!r}", + level=logging.ERROR, + ) + return AdvisoryData( + aliases=aliases, + summary=summary, + references=references, + date_published=date_published, + url=advisory_url, + ) + + affected_version_range = None + fixed_versions = advisory.get("fixed_versions") or [] + affected_range = advisory.get("affected_range") + gitlab_native_schemes = set(["pypi", "gem", "npm", "go", "packagist", "conan"]) + vrc: VersionRange = RANGE_CLASS_BY_SCHEMES[purl.type] + gitlab_scheme = gitlab_scheme_by_purl_type[purl.type] + try: + if affected_range: + if gitlab_scheme in gitlab_native_schemes: + affected_version_range = from_gitlab_native( + gitlab_scheme=gitlab_scheme, string=affected_range + ) + else: + affected_version_range = vrc.from_native(affected_range) + except Exception as e: + logger( + f"advisory_dict_to_advisory_data: affected_range is not parsable: {affected_range!r} for: {purl!s} error: {e!r}\n {traceback.format_exc()}", + level=logging.ERROR, + ) + + parsed_fixed_versions = [] + for fixed_version in fixed_versions: + try: + fixed_version = vrc.version_class(fixed_version) + parsed_fixed_versions.append(fixed_version) + except Exception as e: + logger( + f"advisory_dict_to_advisory_data: fixed_version is not parsable`: {fixed_version!r} error: {e!r}\n {traceback.format_exc()}", + level=logging.ERROR, + ) + + purl_without_version = get_purl( + package_slug=package_slug, + purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme, + logger=logger, + ) + + if parsed_fixed_versions: + affected_packages = list( + extract_affected_packages( + affected_version_range=affected_version_range, + fixed_versions=parsed_fixed_versions, + purl=purl_without_version, + ) + ) + else: + if not affected_version_range: + affected_packages = [] + else: + affected_packages = [ + AffectedPackage( + package=purl_without_version, + affected_version_range=affected_version_range, + ) + ] + + # Determine advisory_url if not provided + if not advisory_url and package_slug and identifier: + advisory_url = urljoin( + "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/", + package_slug + "/" + identifier + ".yml", + ) + + return AdvisoryData( + aliases=aliases, + summary=summary, + references=references, + date_published=date_published, + affected_packages=affected_packages, + weaknesses=cwe_list, + url=advisory_url, + ) + def parse_advisory_path(base_path: Path, file_path: Path) -> Tuple[str, str, str]: """ @@ -219,94 +401,16 @@ def parse_gitlab_advisory( ) return - # refer to schema here https://gitlab.com/gitlab-org/advisories-community/-/blob/main/ci/schema/schema.json - aliases = gitlab_advisory.get("identifiers") - summary = build_description(gitlab_advisory.get("title"), gitlab_advisory.get("description")) - urls = gitlab_advisory.get("urls") - references = [Reference.from_url(u) for u in urls] - - cwe_ids = gitlab_advisory.get("cwe_ids") or [] - cwe_list = list(map(get_cwe_id, cwe_ids)) - - date_published = dateparser.parse(gitlab_advisory.get("pubdate")) - date_published = date_published.replace(tzinfo=pytz.UTC) - package_slug = gitlab_advisory.get("package_slug") advisory_url = get_advisory_url( file=file, base_path=base_path, url="https://gitlab.com/gitlab-org/advisories-community/-/blob/main/", ) - purl: PackageURL = get_purl( - package_slug=package_slug, + + return advisory_dict_to_advisory_data( + advisory=gitlab_advisory, purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme, + gitlab_scheme_by_purl_type=gitlab_scheme_by_purl_type, logger=logger, - ) - if not purl: - logger( - f"parse_yaml_file: purl is not valid: {file!r} {package_slug!r}", level=logging.ERROR - ) - return AdvisoryData( - aliases=aliases, - summary=summary, - references=references, - date_published=date_published, - url=advisory_url, - ) - affected_version_range = None - fixed_versions = gitlab_advisory.get("fixed_versions") or [] - affected_range = gitlab_advisory.get("affected_range") - gitlab_native_schemes = set(["pypi", "gem", "npm", "go", "packagist", "conan"]) - vrc: VersionRange = RANGE_CLASS_BY_SCHEMES[purl.type] - gitlab_scheme = gitlab_scheme_by_purl_type[purl.type] - try: - if affected_range: - if gitlab_scheme in gitlab_native_schemes: - affected_version_range = from_gitlab_native( - gitlab_scheme=gitlab_scheme, string=affected_range - ) - else: - affected_version_range = vrc.from_native(affected_range) - except Exception as e: - logger( - f"parse_yaml_file: affected_range is not parsable: {affected_range!r} for: {purl!s} error: {e!r}\n {traceback.format_exc()}", - level=logging.ERROR, - ) - - parsed_fixed_versions = [] - for fixed_version in fixed_versions: - try: - fixed_version = vrc.version_class(fixed_version) - parsed_fixed_versions.append(fixed_version) - except Exception as e: - logger( - f"parse_yaml_file: fixed_version is not parsable`: {fixed_version!r} error: {e!r}\n {traceback.format_exc()}", - level=logging.ERROR, - ) - - if parsed_fixed_versions: - affected_packages = list( - extract_affected_packages( - affected_version_range=affected_version_range, - fixed_versions=parsed_fixed_versions, - purl=purl, - ) - ) - else: - if not affected_version_range: - affected_packages = [] - else: - affected_packages = [ - AffectedPackage( - package=purl, - affected_version_range=affected_version_range, - ) - ] - return AdvisoryData( - aliases=aliases, - summary=summary, - references=references, - date_published=date_published, - affected_packages=affected_packages, - weaknesses=cwe_list, - url=advisory_url, + advisory_url=advisory_url, ) diff --git a/vulnerabilities/tests/pipelines/test_gitlab_importer_pipeline.py b/vulnerabilities/tests/pipelines/test_gitlab_importer_pipeline.py index c3dc7be43..aa3726898 100644 --- a/vulnerabilities/tests/pipelines/test_gitlab_importer_pipeline.py +++ b/vulnerabilities/tests/pipelines/test_gitlab_importer_pipeline.py @@ -12,6 +12,8 @@ from unittest import mock import pytest +import saneyaml +from packageurl import PackageURL from vulnerabilities.importer import AdvisoryData from vulnerabilities.improvers.default import DefaultImprover @@ -76,3 +78,28 @@ def test_gitlab_improver(mock_response, pkg_type): inference = [data.to_dict() for data in improver.get_inferences(advisory)] result.extend(inference) util_tests.check_results_against_json(result, expected_file) + + +@mock.patch("vulnerabilities.pipelines.gitlab_importer.fetch_gitlab_advisories_for_purl") +def test_gitlab_importer_package_first_mode_found_with_version(mock_fetch): + pkg_type = "pypi" + response_file = TEST_DATA / f"{pkg_type}.yaml" + expected_file = TEST_DATA / f"{pkg_type}-single-mode-expected.json" + + with open(response_file) as f: + advisory_dict = saneyaml.load(f) + + mock_fetch.return_value = [advisory_dict] + purl = PackageURL(type="pypi", name="flask", version="0.9") + pipeline = gitlab_importer.GitLabImporterPipeline(purl=purl) + advisories = list(pipeline.collect_advisories()) + util_tests.check_results_against_json(advisories[0].to_dict(), expected_file) + + +@mock.patch("vulnerabilities.pipelines.gitlab_importer.fetch_gitlab_advisories_for_purl") +def test_gitlab_importer_package_first_mode_none_found(mock_fetch): + mock_fetch.return_value = [] + purl = PackageURL(type="pypi", name="flask", version="1.2") + pipeline = gitlab_importer.GitLabImporterPipeline(purl=purl) + advisories = list(pipeline.collect_advisories()) + assert advisories == [] diff --git a/vulnerabilities/tests/test_data/gitlab/pypi-single-mode-expected.json b/vulnerabilities/tests/test_data/gitlab/pypi-single-mode-expected.json new file mode 100644 index 000000000..dd43a35a3 --- /dev/null +++ b/vulnerabilities/tests/test_data/gitlab/pypi-single-mode-expected.json @@ -0,0 +1,35 @@ +{ + "aliases": ["CVE-2019-1010083"], + "summary": "Denial of service\nDenial of Service due to unexpected memory usage in the Pallets Project Flask", + "affected_packages": [ + { + "package": { + "type": "pypi", + "namespace": "", + "name": "flask", + "version": "", + "qualifiers": "", + "subpath": "" + }, + "affected_version_range": "vers:pypi/<1.0", + "fixed_version": "1.0" + } + ], + "references": [ + { + "reference_id": "CVE-2019-1010083", + "reference_type": "", + "url": "https://nvd.nist.gov/vuln/detail/CVE-2019-1010083", + "severities": [] + }, + { + "reference_id": "", + "reference_type": "", + "url": "https://www.palletsprojects.com/blog/flask-1-0-released/", + "severities": [] + } + ], + "date_published": "2019-07-17T00:00:00+00:00", + "weaknesses": [1035, 937], + "url": "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/pypi/Flask/CVE-2019-1010083.yml" +}