aboutsummaryrefslogtreecommitdiffstats
path: root/src/nvd/endpoints/cve.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvd/endpoints/cve.py')
-rw-r--r--src/nvd/endpoints/cve.py205
1 files changed, 205 insertions, 0 deletions
diff --git a/src/nvd/endpoints/cve.py b/src/nvd/endpoints/cve.py
new file mode 100644
index 0000000..5029f59
--- /dev/null
+++ b/src/nvd/endpoints/cve.py
@@ -0,0 +1,205 @@
+"""CVE API endpoint."""
+
+from datetime import datetime
+from typing import TYPE_CHECKING, AsyncIterator, Optional
+
+from ..models import CVEData, CVEResponse
+
+if TYPE_CHECKING:
+ from ..client import NVDClient
+
+
+class CVEEndpoint:
+ """CVE API endpoint with full parameter support."""
+
+ def __init__(self, client: "NVDClient") -> None:
+ self.client = client
+
+ async def get_cve(self, cve_id: str) -> CVEData:
+ """Get a specific CVE by ID.
+
+ Args:
+ cve_id: CVE identifier (e.g., "CVE-2021-44228")
+
+ Returns:
+ CVE data object
+ """
+ response = await self.client.request(
+ "GET",
+ "/cves/2.0",
+ params={"cveId": cve_id},
+ response_model=CVEResponse,
+ )
+ if not response.vulnerabilities:
+ raise ValueError(f"CVE {cve_id} not found")
+ return response.vulnerabilities[0].cve
+
+ async def search_cves(
+ self,
+ # CVE identification
+ cve_id: Optional[str] = None,
+ # CPE filtering
+ cpe_name: Optional[str] = None,
+ virtual_match_string: Optional[str] = None,
+ # Date ranges (ISO-8601 format, max 120 days)
+ pub_start_date: Optional[str] = None,
+ pub_end_date: Optional[str] = None,
+ last_mod_start_date: Optional[str] = None,
+ last_mod_end_date: Optional[str] = None,
+ kev_start_date: Optional[str] = None,
+ kev_end_date: Optional[str] = None,
+ # CVSS v2 filtering
+ cvss_v2_severity: Optional[str] = None, # LOW, MEDIUM, HIGH
+ cvss_v2_metrics: Optional[str] = None,
+ # CVSS v3 filtering
+ cvss_v3_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL
+ cvss_v3_metrics: Optional[str] = None,
+ # CVSS v4 filtering
+ cvss_v4_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL
+ cvss_v4_metrics: Optional[str] = None,
+ # CWE filtering
+ cwe_id: Optional[str] = None, # e.g., "CWE-79"
+ # Boolean filters
+ has_cert_alerts: Optional[bool] = None,
+ has_cert_notes: Optional[bool] = None,
+ has_kev: Optional[bool] = None,
+ has_oval: Optional[bool] = None,
+ is_vulnerable: Optional[bool] = None,
+ no_rejected: Optional[bool] = None,
+ # Keyword search
+ keyword_search: Optional[str] = None,
+ keyword_exact_match: Optional[bool] = None,
+ # Source
+ source_identifier: Optional[str] = None,
+ # Version filtering (requires cpe_name)
+ version_start: Optional[str] = None,
+ version_start_type: Optional[str] = None, # "including" or "excluding"
+ version_end: Optional[str] = None,
+ version_end_type: Optional[str] = None, # "including" or "excluding"
+ # Pagination
+ results_per_page: int = 2000,
+ start_index: int = 0,
+ ) -> AsyncIterator[CVEData]:
+ """Search for CVEs with extensive filtering options.
+
+ Args:
+ cve_id: Specific CVE identifier
+ cpe_name: CPE 2.3 name
+ virtual_match_string: Virtual CPE match string
+ pub_start_date: Publication start date (ISO-8601)
+ pub_end_date: Publication end date (ISO-8601)
+ last_mod_start_date: Last modified start date (ISO-8601)
+ last_mod_end_date: Last modified end date (ISO-8601)
+ kev_start_date: KEV catalog start date (ISO-8601)
+ kev_end_date: KEV catalog end date (ISO-8601)
+ cvss_v2_severity: CVSS v2 severity (LOW, MEDIUM, HIGH)
+ cvss_v2_metrics: CVSS v2 vector string
+ cvss_v3_severity: CVSS v3 severity (LOW, MEDIUM, HIGH, CRITICAL)
+ cvss_v3_metrics: CVSS v3 vector string
+ cvss_v4_severity: CVSS v4 severity (LOW, MEDIUM, HIGH, CRITICAL)
+ cvss_v4_metrics: CVSS v4 vector string
+ cwe_id: CWE identifier (e.g., "CWE-79")
+ has_cert_alerts: Filter for CERT alerts
+ has_cert_notes: Filter for CERT notes
+ has_kev: Filter for CISA KEV catalog entries
+ has_oval: Filter for OVAL records
+ is_vulnerable: Filter for vulnerable CPE configurations
+ no_rejected: Exclude rejected CVEs
+ keyword_search: Keyword to search in descriptions
+ keyword_exact_match: Require exact keyword match
+ source_identifier: Data source identifier
+ version_start: Start version for CPE filtering
+ version_start_type: "including" or "excluding"
+ version_end: End version for CPE filtering
+ version_end_type: "including" or "excluding"
+ results_per_page: Results per page (max 2000)
+ start_index: Starting index for pagination
+
+ Yields:
+ CVE data objects
+ """
+ params = {
+ "cveId": cve_id,
+ "cpeName": cpe_name,
+ "virtualMatchString": virtual_match_string,
+ "pubStartDate": pub_start_date,
+ "pubEndDate": pub_end_date,
+ "lastModStartDate": last_mod_start_date,
+ "lastModEndDate": last_mod_end_date,
+ "kevStartDate": kev_start_date,
+ "kevEndDate": kev_end_date,
+ "cvssV2Severity": cvss_v2_severity,
+ "cvssV2Metrics": cvss_v2_metrics,
+ "cvssV3Severity": cvss_v3_severity,
+ "cvssV3Metrics": cvss_v3_metrics,
+ "cvssV4Severity": cvss_v4_severity,
+ "cvssV4Metrics": cvss_v4_metrics,
+ "cweId": cwe_id,
+ "hasCertAlerts": has_cert_alerts,
+ "hasCertNotes": has_cert_notes,
+ "hasKev": has_kev,
+ "hasOval": has_oval,
+ "isVulnerable": is_vulnerable,
+ "noRejected": no_rejected,
+ "keywordSearch": keyword_search,
+ "keywordExactMatch": keyword_exact_match,
+ "sourceIdentifier": source_identifier,
+ "versionStart": version_start,
+ "versionStartType": version_start_type,
+ "versionEnd": version_end,
+ "versionEndType": version_end_type,
+ "resultsPerPage": results_per_page,
+ "startIndex": start_index,
+ }
+
+ current_index = start_index
+ while True:
+ params["startIndex"] = current_index
+ response = await self.client.request(
+ "GET",
+ "/cves/2.0",
+ params=params,
+ response_model=CVEResponse,
+ )
+
+ for item in response.vulnerabilities:
+ yield item.cve
+
+ # Check if there are more results
+ if current_index + response.resultsPerPage >= response.totalResults:
+ break
+
+ current_index += response.resultsPerPage
+
+ async def get_cves_by_cpe(
+ self, cpe_name: str, **kwargs: object
+ ) -> AsyncIterator[CVEData]:
+ """Get CVEs for a specific CPE.
+
+ Args:
+ cpe_name: CPE 2.3 name
+ **kwargs: Additional search parameters
+
+ Yields:
+ CVE data objects
+ """
+ async for cve in self.search_cves(cpe_name=cpe_name, **kwargs):
+ yield cve
+
+ async def get_cves_by_keyword(
+ self, keyword: str, exact_match: bool = False, **kwargs: object
+ ) -> AsyncIterator[CVEData]:
+ """Search CVEs by keyword.
+
+ Args:
+ keyword: Keyword to search
+ exact_match: Require exact match
+ **kwargs: Additional search parameters
+
+ Yields:
+ CVE data objects
+ """
+ async for cve in self.search_cves(
+ keyword_search=keyword, keyword_exact_match=exact_match, **kwargs
+ ):
+ yield cve