"""CVE API endpoint.""" from datetime import datetime from typing import TYPE_CHECKING, AsyncIterator, Optional from ..models import CVEData, CVEResponse if TYPE_CHECKING: from ..client import NVDClient class CVEEndpoint: """CVE API endpoint with full parameter support.""" def __init__(self, client: "NVDClient") -> None: self.client = client async def get_cve(self, cve_id: str) -> CVEData: """Get a specific CVE by ID. Args: cve_id: CVE identifier (e.g., "CVE-2021-44228") Returns: CVE data object """ response = await self.client.request( "GET", "/cves/2.0", params={"cveId": cve_id}, response_model=CVEResponse, ) if not response.vulnerabilities: raise ValueError(f"CVE {cve_id} not found") return response.vulnerabilities[0].cve async def search_cves( self, # CVE identification cve_id: Optional[str] = None, # CPE filtering cpe_name: Optional[str] = None, virtual_match_string: Optional[str] = None, # Date ranges (ISO-8601 format, max 120 days) pub_start_date: Optional[str] = None, pub_end_date: Optional[str] = None, last_mod_start_date: Optional[str] = None, last_mod_end_date: Optional[str] = None, kev_start_date: Optional[str] = None, kev_end_date: Optional[str] = None, # CVSS v2 filtering cvss_v2_severity: Optional[str] = None, # LOW, MEDIUM, HIGH cvss_v2_metrics: Optional[str] = None, # CVSS v3 filtering cvss_v3_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL cvss_v3_metrics: Optional[str] = None, # CVSS v4 filtering cvss_v4_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL cvss_v4_metrics: Optional[str] = None, # CWE filtering cwe_id: Optional[str] = None, # e.g., "CWE-79" # Boolean filters has_cert_alerts: Optional[bool] = None, has_cert_notes: Optional[bool] = None, has_kev: Optional[bool] = None, has_oval: Optional[bool] = None, is_vulnerable: Optional[bool] = None, no_rejected: Optional[bool] = None, # Keyword search keyword_search: Optional[str] = None, keyword_exact_match: Optional[bool] = None, # Source source_identifier: Optional[str] = None, # Version filtering (requires cpe_name) version_start: Optional[str] = None, version_start_type: Optional[str] = None, # "including" or "excluding" version_end: Optional[str] = None, version_end_type: Optional[str] = None, # "including" or "excluding" # Pagination results_per_page: int = 2000, start_index: int = 0, ) -> AsyncIterator[CVEData]: """Search for CVEs with extensive filtering options. Args: cve_id: Specific CVE identifier cpe_name: CPE 2.3 name virtual_match_string: Virtual CPE match string pub_start_date: Publication start date (ISO-8601) pub_end_date: Publication end date (ISO-8601) last_mod_start_date: Last modified start date (ISO-8601) last_mod_end_date: Last modified end date (ISO-8601) kev_start_date: KEV catalog start date (ISO-8601) kev_end_date: KEV catalog end date (ISO-8601) cvss_v2_severity: CVSS v2 severity (LOW, MEDIUM, HIGH) cvss_v2_metrics: CVSS v2 vector string cvss_v3_severity: CVSS v3 severity (LOW, MEDIUM, HIGH, CRITICAL) cvss_v3_metrics: CVSS v3 vector string cvss_v4_severity: CVSS v4 severity (LOW, MEDIUM, HIGH, CRITICAL) cvss_v4_metrics: CVSS v4 vector string cwe_id: CWE identifier (e.g., "CWE-79") has_cert_alerts: Filter for CERT alerts has_cert_notes: Filter for CERT notes has_kev: Filter for CISA KEV catalog entries has_oval: Filter for OVAL records is_vulnerable: Filter for vulnerable CPE configurations no_rejected: Exclude rejected CVEs keyword_search: Keyword to search in descriptions keyword_exact_match: Require exact keyword match source_identifier: Data source identifier version_start: Start version for CPE filtering version_start_type: "including" or "excluding" version_end: End version for CPE filtering version_end_type: "including" or "excluding" results_per_page: Results per page (max 2000) start_index: Starting index for pagination Yields: CVE data objects """ params = { "cveId": cve_id, "cpeName": cpe_name, "virtualMatchString": virtual_match_string, "pubStartDate": pub_start_date, "pubEndDate": pub_end_date, "lastModStartDate": last_mod_start_date, "lastModEndDate": last_mod_end_date, "kevStartDate": kev_start_date, "kevEndDate": kev_end_date, "cvssV2Severity": cvss_v2_severity, "cvssV2Metrics": cvss_v2_metrics, "cvssV3Severity": cvss_v3_severity, "cvssV3Metrics": cvss_v3_metrics, "cvssV4Severity": cvss_v4_severity, "cvssV4Metrics": cvss_v4_metrics, "cweId": cwe_id, "hasCertAlerts": has_cert_alerts, "hasCertNotes": has_cert_notes, "hasKev": has_kev, "hasOval": has_oval, "isVulnerable": is_vulnerable, "noRejected": no_rejected, "keywordSearch": keyword_search, "keywordExactMatch": keyword_exact_match, "sourceIdentifier": source_identifier, "versionStart": version_start, "versionStartType": version_start_type, "versionEnd": version_end, "versionEndType": version_end_type, "resultsPerPage": results_per_page, "startIndex": start_index, } current_index = start_index while True: params["startIndex"] = current_index response = await self.client.request( "GET", "/cves/2.0", params=params, response_model=CVEResponse, ) for item in response.vulnerabilities: yield item.cve # Check if there are more results if current_index + response.resultsPerPage >= response.totalResults: break current_index += response.resultsPerPage async def get_cves_by_cpe( self, cpe_name: str, **kwargs: object ) -> AsyncIterator[CVEData]: """Get CVEs for a specific CPE. Args: cpe_name: CPE 2.3 name **kwargs: Additional search parameters Yields: CVE data objects """ async for cve in self.search_cves(cpe_name=cpe_name, **kwargs): yield cve async def get_cves_by_keyword( self, keyword: str, exact_match: bool = False, **kwargs: object ) -> AsyncIterator[CVEData]: """Search CVEs by keyword. Args: keyword: Keyword to search exact_match: Require exact match **kwargs: Additional search parameters Yields: CVE data objects """ async for cve in self.search_cves( keyword_search=keyword, keyword_exact_match=exact_match, **kwargs ): yield cve