diff options
Diffstat (limited to 'src/nvd/endpoints/cve.py')
| -rw-r--r-- | src/nvd/endpoints/cve.py | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/src/nvd/endpoints/cve.py b/src/nvd/endpoints/cve.py new file mode 100644 index 0000000..5029f59 --- /dev/null +++ b/src/nvd/endpoints/cve.py @@ -0,0 +1,205 @@ +"""CVE API endpoint.""" + +from datetime import datetime +from typing import TYPE_CHECKING, AsyncIterator, Optional + +from ..models import CVEData, CVEResponse + +if TYPE_CHECKING: + from ..client import NVDClient + + +class CVEEndpoint: + """CVE API endpoint with full parameter support.""" + + def __init__(self, client: "NVDClient") -> None: + self.client = client + + async def get_cve(self, cve_id: str) -> CVEData: + """Get a specific CVE by ID. + + Args: + cve_id: CVE identifier (e.g., "CVE-2021-44228") + + Returns: + CVE data object + """ + response = await self.client.request( + "GET", + "/cves/2.0", + params={"cveId": cve_id}, + response_model=CVEResponse, + ) + if not response.vulnerabilities: + raise ValueError(f"CVE {cve_id} not found") + return response.vulnerabilities[0].cve + + async def search_cves( + self, + # CVE identification + cve_id: Optional[str] = None, + # CPE filtering + cpe_name: Optional[str] = None, + virtual_match_string: Optional[str] = None, + # Date ranges (ISO-8601 format, max 120 days) + pub_start_date: Optional[str] = None, + pub_end_date: Optional[str] = None, + last_mod_start_date: Optional[str] = None, + last_mod_end_date: Optional[str] = None, + kev_start_date: Optional[str] = None, + kev_end_date: Optional[str] = None, + # CVSS v2 filtering + cvss_v2_severity: Optional[str] = None, # LOW, MEDIUM, HIGH + cvss_v2_metrics: Optional[str] = None, + # CVSS v3 filtering + cvss_v3_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL + cvss_v3_metrics: Optional[str] = None, + # CVSS v4 filtering + cvss_v4_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL + cvss_v4_metrics: Optional[str] = None, + # CWE filtering + cwe_id: Optional[str] = None, # e.g., "CWE-79" + # Boolean filters + has_cert_alerts: Optional[bool] = None, + has_cert_notes: Optional[bool] = None, + has_kev: Optional[bool] = None, + has_oval: Optional[bool] = None, + is_vulnerable: Optional[bool] = None, + no_rejected: Optional[bool] = None, + # Keyword search + keyword_search: Optional[str] = None, + keyword_exact_match: Optional[bool] = None, + # Source + source_identifier: Optional[str] = None, + # Version filtering (requires cpe_name) + version_start: Optional[str] = None, + version_start_type: Optional[str] = None, # "including" or "excluding" + version_end: Optional[str] = None, + version_end_type: Optional[str] = None, # "including" or "excluding" + # Pagination + results_per_page: int = 2000, + start_index: int = 0, + ) -> AsyncIterator[CVEData]: + """Search for CVEs with extensive filtering options. + + Args: + cve_id: Specific CVE identifier + cpe_name: CPE 2.3 name + virtual_match_string: Virtual CPE match string + pub_start_date: Publication start date (ISO-8601) + pub_end_date: Publication end date (ISO-8601) + last_mod_start_date: Last modified start date (ISO-8601) + last_mod_end_date: Last modified end date (ISO-8601) + kev_start_date: KEV catalog start date (ISO-8601) + kev_end_date: KEV catalog end date (ISO-8601) + cvss_v2_severity: CVSS v2 severity (LOW, MEDIUM, HIGH) + cvss_v2_metrics: CVSS v2 vector string + cvss_v3_severity: CVSS v3 severity (LOW, MEDIUM, HIGH, CRITICAL) + cvss_v3_metrics: CVSS v3 vector string + cvss_v4_severity: CVSS v4 severity (LOW, MEDIUM, HIGH, CRITICAL) + cvss_v4_metrics: CVSS v4 vector string + cwe_id: CWE identifier (e.g., "CWE-79") + has_cert_alerts: Filter for CERT alerts + has_cert_notes: Filter for CERT notes + has_kev: Filter for CISA KEV catalog entries + has_oval: Filter for OVAL records + is_vulnerable: Filter for vulnerable CPE configurations + no_rejected: Exclude rejected CVEs + keyword_search: Keyword to search in descriptions + keyword_exact_match: Require exact keyword match + source_identifier: Data source identifier + version_start: Start version for CPE filtering + version_start_type: "including" or "excluding" + version_end: End version for CPE filtering + version_end_type: "including" or "excluding" + results_per_page: Results per page (max 2000) + start_index: Starting index for pagination + + Yields: + CVE data objects + """ + params = { + "cveId": cve_id, + "cpeName": cpe_name, + "virtualMatchString": virtual_match_string, + "pubStartDate": pub_start_date, + "pubEndDate": pub_end_date, + "lastModStartDate": last_mod_start_date, + "lastModEndDate": last_mod_end_date, + "kevStartDate": kev_start_date, + "kevEndDate": kev_end_date, + "cvssV2Severity": cvss_v2_severity, + "cvssV2Metrics": cvss_v2_metrics, + "cvssV3Severity": cvss_v3_severity, + "cvssV3Metrics": cvss_v3_metrics, + "cvssV4Severity": cvss_v4_severity, + "cvssV4Metrics": cvss_v4_metrics, + "cweId": cwe_id, + "hasCertAlerts": has_cert_alerts, + "hasCertNotes": has_cert_notes, + "hasKev": has_kev, + "hasOval": has_oval, + "isVulnerable": is_vulnerable, + "noRejected": no_rejected, + "keywordSearch": keyword_search, + "keywordExactMatch": keyword_exact_match, + "sourceIdentifier": source_identifier, + "versionStart": version_start, + "versionStartType": version_start_type, + "versionEnd": version_end, + "versionEndType": version_end_type, + "resultsPerPage": results_per_page, + "startIndex": start_index, + } + + current_index = start_index + while True: + params["startIndex"] = current_index + response = await self.client.request( + "GET", + "/cves/2.0", + params=params, + response_model=CVEResponse, + ) + + for item in response.vulnerabilities: + yield item.cve + + # Check if there are more results + if current_index + response.resultsPerPage >= response.totalResults: + break + + current_index += response.resultsPerPage + + async def get_cves_by_cpe( + self, cpe_name: str, **kwargs: object + ) -> AsyncIterator[CVEData]: + """Get CVEs for a specific CPE. + + Args: + cpe_name: CPE 2.3 name + **kwargs: Additional search parameters + + Yields: + CVE data objects + """ + async for cve in self.search_cves(cpe_name=cpe_name, **kwargs): + yield cve + + async def get_cves_by_keyword( + self, keyword: str, exact_match: bool = False, **kwargs: object + ) -> AsyncIterator[CVEData]: + """Search CVEs by keyword. + + Args: + keyword: Keyword to search + exact_match: Require exact match + **kwargs: Additional search parameters + + Yields: + CVE data objects + """ + async for cve in self.search_cves( + keyword_search=keyword, keyword_exact_match=exact_match, **kwargs + ): + yield cve |
