diff --git a/vulnerabilities/importers/ruby.py b/vulnerabilities/importers/ruby.py index 268419587..e1ce6774b 100644 --- a/vulnerabilities/importers/ruby.py +++ b/vulnerabilities/importers/ruby.py @@ -10,11 +10,16 @@ import logging from pathlib import Path from typing import Iterable +from typing import List +from typing import Optional +import requests +import saneyaml from dateutil.parser import parse from packageurl import PackageURL from pytz import UTC from univers.version_range import GemVersionRange +from univers.versions import RubygemsVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -52,7 +57,69 @@ class RubyImporter(Importer): SOFTWARE. """ + def __init__(self, purl=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + if self.purl and self.purl.type not in ("gem", "ruby"): + print( + f"Warning: PURL type {self.purl.type} is not 'gem' or 'ruby, may not match any advisories" + ) + def advisory_data(self) -> Iterable[AdvisoryData]: + if not self.purl: + return self._batch_advisory_data() + + return self._package_first_advisory_data() + + def _package_first_advisory_data(self) -> Iterable[AdvisoryData]: + if self.purl.type not in ("gem", "ruby"): + return [] + + try: + yaml_files = [] + + if self.purl.type == "gem": + files = self._fetch_github_directory_content(f"gems/{self.purl.name}") + yaml_files.extend( + [ + (file, "gems") + for file in files + if file.endswith(".yml") and not file.startswith("OSVDB-") + ] + ) + elif self.purl.type == "ruby": + files = self._fetch_github_directory_content("rubies") + yaml_files.extend( + [ + (file, "rubies") + for file in files + if file.endswith(".yml") and not file.startswith("OSVDB-") + ] + ) + + for file_path, schema_type in yaml_files: + content = self._fetch_github_file_content(file_path) + if not content: + continue + + raw_data = saneyaml.load(content) + + if schema_type == "rubies" and raw_data.get("engine") != self.purl.name: + continue + + advisory_url = ( + f"https://github.com/rubysec/ruby-advisory-db/blob/master/{file_path}" + ) + advisory = parse_ruby_advisory(raw_data, schema_type, advisory_url) + + if advisory and self._advisory_affects_purl(advisory): + yield advisory + + except Exception as e: + logger.error(f"Error fetching advisories for {self.purl}: {str(e)}") + return [] + + def _batch_advisory_data(self) -> Iterable[AdvisoryData]: try: self.clone(self.repo_url) base_path = Path(self.vcs_response.dest_dir) @@ -72,6 +139,56 @@ def advisory_data(self) -> Iterable[AdvisoryData]: if self.vcs_response: self.vcs_response.delete() + def _advisory_affects_purl(self, advisory: AdvisoryData) -> bool: + if not self.purl: + return True + + for affected_package in advisory.affected_packages: + if affected_package.package.type != self.purl.type: + continue + + if affected_package.package.name != self.purl.name: + continue + + if self.purl.version and affected_package.affected_version_range: + purl_version = RubygemsVersion(self.purl.version) + + if purl_version not in affected_package.affected_version_range: + continue + + return True + + return False + + def _fetch_github_directory_content(self, path: str) -> List[str]: + url = f"https://api.github.com/repos/rubysec/ruby-advisory-db/contents/{path}" + response = requests.get(url) + + if response.status_code != 200: + logger.error(f"Failed to fetch directory contents from GitHub: {response.status_code}") + return [] + + contents = response.json() + file_paths = [] + + for item in contents: + if item["type"] == "file": + file_paths.append(item["path"]) + elif item["type"] == "dir": + file_paths.extend(self._fetch_github_directory_content(item["path"])) + + return file_paths + + def _fetch_github_file_content(self, path: str) -> Optional[str]: + url = f"https://api.github.com/repos/rubysec/ruby-advisory-db/contents/{path}" + response = requests.get(url, headers={"Accept": "application/vnd.github.v3.raw"}) + + if response.status_code != 200: + logger.error(f"Failed to fetch file content from GitHub: {response.status_code}") + return None + + return response.text + def parse_ruby_advisory(record, schema_type, advisory_url): """ diff --git a/vulnerabilities/tests/test_ruby.py b/vulnerabilities/tests/test_ruby.py index e66300512..433948e23 100644 --- a/vulnerabilities/tests/test_ruby.py +++ b/vulnerabilities/tests/test_ruby.py @@ -13,9 +13,11 @@ import pytest from packageurl import PackageURL from univers.version_range import GemVersionRange +from univers.versions import RubygemsVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage +from vulnerabilities.importers.ruby import RubyImporter from vulnerabilities.importers.ruby import get_affected_packages from vulnerabilities.importers.ruby import parse_ruby_advisory from vulnerabilities.improvers.default import DefaultImprover @@ -94,3 +96,83 @@ def test_ruby_improver(mock_response): ) def test_get_affected_packages(record, purl, result): assert get_affected_packages(record, purl) == result + + +@pytest.fixture +def mock_github_api(monkeypatch): + test_files = { + "gems/sinatra/CVE-2018-7212.yml": open(os.path.join(TEST_DATA, "CVE-2018-7212.yml")).read(), + "gems/sinatra/CVE-2018-11627.yml": open( + os.path.join(TEST_DATA, "CVE-2018-11627.yml") + ).read(), + "rubies/CVE-2010-1330.yml": open(os.path.join(TEST_DATA, "CVE-2010-1330.yml")).read(), + "rubies/CVE-2007-5770.yml": open(os.path.join(TEST_DATA, "CVE-2007-5770.yml")).read(), + } + + dir_listing = { + "gems/sinatra": [ + "gems/sinatra/CVE-2018-7212.yml", + "gems/sinatra/CVE-2018-11627.yml", + ], + "rubies": [ + "rubies/CVE-2010-1330.yml", + "rubies/CVE-2007-5770.yml", + ], + } + + def mock_fetch_github_directory_content(self, path): + return dir_listing.get(path, []) + + def mock_fetch_github_file_content(self, path): + return test_files.get(path, "") + + monkeypatch.setattr( + RubyImporter, "_fetch_github_directory_content", mock_fetch_github_directory_content + ) + monkeypatch.setattr(RubyImporter, "_fetch_github_file_content", mock_fetch_github_file_content) + + +def test_package_first_mode_gem_affecting(mock_github_api): + purl = PackageURL(type="gem", name="sinatra") + importer = RubyImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert len(advisories) == 2 + assert all(a.affected_packages[0].package.name == "sinatra" for a in advisories) + + +def test_package_first_mode_gem_version(mock_github_api): + purl = PackageURL(type="gem", name="sinatra", version="1.2.7") + importer = RubyImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert len(advisories) == 2 + for adv in advisories: + affected = any( + purl.version + and ap.package.name == purl.name + and ap.affected_version_range + and ap.affected_version_range.contains(RubygemsVersion(purl.version)) + for ap in adv.affected_packages + ) + assert affected + + +def test_package_first_mode_gem_not_affecting(mock_github_api): + purl = PackageURL(type="gem", name="nonexistent", version="9.9.9") + importer = RubyImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert advisories == [] + + +def test_package_first_mode_ruby_engine(mock_github_api): + purl = PackageURL(type="ruby", name="jruby") + importer = RubyImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert len(advisories) == 1 + assert advisories[0].affected_packages[0].package.name == "jruby" + + +def test_package_first_mode_ruby_engine_not_affecting(mock_github_api): + purl = PackageURL(type="ruby", name="nonexistent") + importer = RubyImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert advisories == []