diff options
Diffstat (limited to 'src/nvd/endpoints')
| -rw-r--r-- | src/nvd/endpoints/__init__.py | 1 | ||||
| -rw-r--r-- | src/nvd/endpoints/cpe.py | 93 | ||||
| -rw-r--r-- | src/nvd/endpoints/cpematch.py | 101 | ||||
| -rw-r--r-- | src/nvd/endpoints/cve.py | 205 | ||||
| -rw-r--r-- | src/nvd/endpoints/cve_history.py | 79 | ||||
| -rw-r--r-- | src/nvd/endpoints/source.py | 81 |
6 files changed, 560 insertions, 0 deletions
diff --git a/src/nvd/endpoints/__init__.py b/src/nvd/endpoints/__init__.py new file mode 100644 index 0000000..551a8fe --- /dev/null +++ b/src/nvd/endpoints/__init__.py @@ -0,0 +1 @@ +"""NVD API endpoints.""" diff --git a/src/nvd/endpoints/cpe.py b/src/nvd/endpoints/cpe.py new file mode 100644 index 0000000..9edd23a --- /dev/null +++ b/src/nvd/endpoints/cpe.py @@ -0,0 +1,93 @@ +"""CPE API endpoint.""" + +from typing import TYPE_CHECKING, AsyncIterator, Optional + +from ..models import CPEData, CPEResponse + +if TYPE_CHECKING: + from ..client import NVDClient + + +class CPEEndpoint: + """CPE (Common Platform Enumeration) API endpoint.""" + + def __init__(self, client: "NVDClient") -> None: + self.client = client + + async def get_cpe(self, cpe_name_id: str) -> CPEData: + """Get a specific CPE by UUID. + + Args: + cpe_name_id: CPE Name UUID + + Returns: + CPE data object + """ + response = await self.client.request( + "GET", + "/cpes/2.0", + params={"cpeNameId": cpe_name_id}, + response_model=CPEResponse, + ) + if not response.products: + raise ValueError(f"CPE {cpe_name_id} not found") + return response.products[0].cpe + + async def search_cpes( + self, + cpe_name_id: Optional[str] = None, + cpe_match_string: Optional[str] = None, + keyword_search: Optional[str] = None, + keyword_exact_match: Optional[bool] = None, + last_mod_start_date: Optional[str] = None, + last_mod_end_date: Optional[str] = None, + match_criteria_id: Optional[str] = None, + results_per_page: int = 10000, + start_index: int = 0, + ) -> AsyncIterator[CPEData]: + """Search for CPEs. + + Args: + cpe_name_id: CPE Name UUID + cpe_match_string: CPE match string pattern + keyword_search: Keyword to search in titles and references + keyword_exact_match: Require exact keyword match + last_mod_start_date: Last modified start date (ISO-8601) + last_mod_end_date: Last modified end date (ISO-8601) + match_criteria_id: Match criteria UUID + results_per_page: Results per page (max 10000) + start_index: Starting index for pagination + + Yields: + CPE data objects + """ + params = { + "cpeNameId": cpe_name_id, + "cpeMatchString": cpe_match_string, + "keywordSearch": keyword_search, + "keywordExactMatch": keyword_exact_match, + "lastModStartDate": last_mod_start_date, + "lastModEndDate": last_mod_end_date, + "matchCriteriaId": match_criteria_id, + "resultsPerPage": results_per_page, + "startIndex": start_index, + } + + current_index = start_index + while True: + params["startIndex"] = current_index + response = await self.client.request( + "GET", + "/cpes/2.0", + params=params, + response_model=CPEResponse, + ) + + for item in response.products: + yield item.cpe + + # Check if there are more results + if current_index + response.resultsPerPage >= response.totalResults: + break + + current_index += response.resultsPerPage diff --git a/src/nvd/endpoints/cpematch.py b/src/nvd/endpoints/cpematch.py new file mode 100644 index 0000000..0575859 --- /dev/null +++ b/src/nvd/endpoints/cpematch.py @@ -0,0 +1,101 @@ +"""CPE Match Criteria API endpoint.""" + +from typing import TYPE_CHECKING, AsyncIterator, List, Optional + +from ..models import CPEMatchResponse, CPEMatchString + +if TYPE_CHECKING: + from ..client import NVDClient + + +class CPEMatchEndpoint: + """CPE Match Criteria API endpoint.""" + + def __init__(self, client: "NVDClient") -> None: + self.client = client + + async def get_match_criteria(self, match_criteria_id: str) -> CPEMatchString: + """Get specific match criteria by UUID. + + Args: + match_criteria_id: Match criteria UUID + + Returns: + Match criteria object + """ + response = await self.client.request( + "GET", + "/cpematch/2.0", + params={"matchCriteriaId": match_criteria_id}, + response_model=CPEMatchResponse, + ) + if not response.matchStrings: + raise ValueError(f"Match criteria {match_criteria_id} not found") + return response.matchStrings[0] + + async def get_cve_match_criteria(self, cve_id: str) -> List[CPEMatchString]: + """Get all match criteria for a specific CVE. + + Args: + cve_id: CVE identifier + + Returns: + List of match criteria objects + """ + results: List[CPEMatchString] = [] + async for match in self.search_match_criteria(cve_id=cve_id): + results.append(match) + return results + + async def search_match_criteria( + self, + cve_id: Optional[str] = None, + match_criteria_id: Optional[str] = None, + match_string_search: Optional[str] = None, + last_mod_start_date: Optional[str] = None, + last_mod_end_date: Optional[str] = None, + results_per_page: int = 500, + start_index: int = 0, + ) -> AsyncIterator[CPEMatchString]: + """Search for CPE match criteria. + + Args: + cve_id: CVE identifier to get match strings for + match_criteria_id: Specific match criteria UUID + match_string_search: Match string pattern to search + last_mod_start_date: Last modified start date (ISO-8601) + last_mod_end_date: Last modified end date (ISO-8601) + results_per_page: Results per page (max 500) + start_index: Starting index for pagination + + Yields: + Match criteria objects + """ + params = { + "cveId": cve_id, + "matchCriteriaId": match_criteria_id, + "matchStringSearch": match_string_search, + "lastModStartDate": last_mod_start_date, + "lastModEndDate": last_mod_end_date, + "resultsPerPage": results_per_page, + "startIndex": start_index, + } + + current_index = start_index + while True: + params["startIndex"] = current_index + response = await self.client.request( + "GET", + "/cpematch/2.0", + params=params, + response_model=CPEMatchResponse, + ) + + for match_string in response.matchStrings: + yield match_string + + # Check if there are more results + if current_index + response.resultsPerPage >= response.totalResults: + break + + current_index += response.resultsPerPage diff --git a/src/nvd/endpoints/cve.py b/src/nvd/endpoints/cve.py new file mode 100644 index 0000000..5029f59 --- /dev/null +++ b/src/nvd/endpoints/cve.py @@ -0,0 +1,205 @@ +"""CVE API endpoint.""" + +from datetime import datetime +from typing import TYPE_CHECKING, AsyncIterator, Optional + +from ..models import CVEData, CVEResponse + +if TYPE_CHECKING: + from ..client import NVDClient + + +class CVEEndpoint: + """CVE API endpoint with full parameter support.""" + + def __init__(self, client: "NVDClient") -> None: + self.client = client + + async def get_cve(self, cve_id: str) -> CVEData: + """Get a specific CVE by ID. + + Args: + cve_id: CVE identifier (e.g., "CVE-2021-44228") + + Returns: + CVE data object + """ + response = await self.client.request( + "GET", + "/cves/2.0", + params={"cveId": cve_id}, + response_model=CVEResponse, + ) + if not response.vulnerabilities: + raise ValueError(f"CVE {cve_id} not found") + return response.vulnerabilities[0].cve + + async def search_cves( + self, + # CVE identification + cve_id: Optional[str] = None, + # CPE filtering + cpe_name: Optional[str] = None, + virtual_match_string: Optional[str] = None, + # Date ranges (ISO-8601 format, max 120 days) + pub_start_date: Optional[str] = None, + pub_end_date: Optional[str] = None, + last_mod_start_date: Optional[str] = None, + last_mod_end_date: Optional[str] = None, + kev_start_date: Optional[str] = None, + kev_end_date: Optional[str] = None, + # CVSS v2 filtering + cvss_v2_severity: Optional[str] = None, # LOW, MEDIUM, HIGH + cvss_v2_metrics: Optional[str] = None, + # CVSS v3 filtering + cvss_v3_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL + cvss_v3_metrics: Optional[str] = None, + # CVSS v4 filtering + cvss_v4_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL + cvss_v4_metrics: Optional[str] = None, + # CWE filtering + cwe_id: Optional[str] = None, # e.g., "CWE-79" + # Boolean filters + has_cert_alerts: Optional[bool] = None, + has_cert_notes: Optional[bool] = None, + has_kev: Optional[bool] = None, + has_oval: Optional[bool] = None, + is_vulnerable: Optional[bool] = None, + no_rejected: Optional[bool] = None, + # Keyword search + keyword_search: Optional[str] = None, + keyword_exact_match: Optional[bool] = None, + # Source + source_identifier: Optional[str] = None, + # Version filtering (requires cpe_name) + version_start: Optional[str] = None, + version_start_type: Optional[str] = None, # "including" or "excluding" + version_end: Optional[str] = None, + version_end_type: Optional[str] = None, # "including" or "excluding" + # Pagination + results_per_page: int = 2000, + start_index: int = 0, + ) -> AsyncIterator[CVEData]: + """Search for CVEs with extensive filtering options. + + Args: + cve_id: Specific CVE identifier + cpe_name: CPE 2.3 name + virtual_match_string: Virtual CPE match string + pub_start_date: Publication start date (ISO-8601) + pub_end_date: Publication end date (ISO-8601) + last_mod_start_date: Last modified start date (ISO-8601) + last_mod_end_date: Last modified end date (ISO-8601) + kev_start_date: KEV catalog start date (ISO-8601) + kev_end_date: KEV catalog end date (ISO-8601) + cvss_v2_severity: CVSS v2 severity (LOW, MEDIUM, HIGH) + cvss_v2_metrics: CVSS v2 vector string + cvss_v3_severity: CVSS v3 severity (LOW, MEDIUM, HIGH, CRITICAL) + cvss_v3_metrics: CVSS v3 vector string + cvss_v4_severity: CVSS v4 severity (LOW, MEDIUM, HIGH, CRITICAL) + cvss_v4_metrics: CVSS v4 vector string + cwe_id: CWE identifier (e.g., "CWE-79") + has_cert_alerts: Filter for CERT alerts + has_cert_notes: Filter for CERT notes + has_kev: Filter for CISA KEV catalog entries + has_oval: Filter for OVAL records + is_vulnerable: Filter for vulnerable CPE configurations + no_rejected: Exclude rejected CVEs + keyword_search: Keyword to search in descriptions + keyword_exact_match: Require exact keyword match + source_identifier: Data source identifier + version_start: Start version for CPE filtering + version_start_type: "including" or "excluding" + version_end: End version for CPE filtering + version_end_type: "including" or "excluding" + results_per_page: Results per page (max 2000) + start_index: Starting index for pagination + + Yields: + CVE data objects + """ + params = { + "cveId": cve_id, + "cpeName": cpe_name, + "virtualMatchString": virtual_match_string, + "pubStartDate": pub_start_date, + "pubEndDate": pub_end_date, + "lastModStartDate": last_mod_start_date, + "lastModEndDate": last_mod_end_date, + "kevStartDate": kev_start_date, + "kevEndDate": kev_end_date, + "cvssV2Severity": cvss_v2_severity, + "cvssV2Metrics": cvss_v2_metrics, + "cvssV3Severity": cvss_v3_severity, + "cvssV3Metrics": cvss_v3_metrics, + "cvssV4Severity": cvss_v4_severity, + "cvssV4Metrics": cvss_v4_metrics, + "cweId": cwe_id, + "hasCertAlerts": has_cert_alerts, + "hasCertNotes": has_cert_notes, + "hasKev": has_kev, + "hasOval": has_oval, + "isVulnerable": is_vulnerable, + "noRejected": no_rejected, + "keywordSearch": keyword_search, + "keywordExactMatch": keyword_exact_match, + "sourceIdentifier": source_identifier, + "versionStart": version_start, + "versionStartType": version_start_type, + "versionEnd": version_end, + "versionEndType": version_end_type, + "resultsPerPage": results_per_page, + "startIndex": start_index, + } + + current_index = start_index + while True: + params["startIndex"] = current_index + response = await self.client.request( + "GET", + "/cves/2.0", + params=params, + response_model=CVEResponse, + ) + + for item in response.vulnerabilities: + yield item.cve + + # Check if there are more results + if current_index + response.resultsPerPage >= response.totalResults: + break + + current_index += response.resultsPerPage + + async def get_cves_by_cpe( + self, cpe_name: str, **kwargs: object + ) -> AsyncIterator[CVEData]: + """Get CVEs for a specific CPE. + + Args: + cpe_name: CPE 2.3 name + **kwargs: Additional search parameters + + Yields: + CVE data objects + """ + async for cve in self.search_cves(cpe_name=cpe_name, **kwargs): + yield cve + + async def get_cves_by_keyword( + self, keyword: str, exact_match: bool = False, **kwargs: object + ) -> AsyncIterator[CVEData]: + """Search CVEs by keyword. + + Args: + keyword: Keyword to search + exact_match: Require exact match + **kwargs: Additional search parameters + + Yields: + CVE data objects + """ + async for cve in self.search_cves( + keyword_search=keyword, keyword_exact_match=exact_match, **kwargs + ): + yield cve diff --git a/src/nvd/endpoints/cve_history.py b/src/nvd/endpoints/cve_history.py new file mode 100644 index 0000000..c20670d --- /dev/null +++ b/src/nvd/endpoints/cve_history.py @@ -0,0 +1,79 @@ +"""CVE Change History API endpoint.""" + +from typing import TYPE_CHECKING, AsyncIterator, List, Optional + +from ..models import CVEChange, CVEChangeResponse + +if TYPE_CHECKING: + from ..client import NVDClient + + +class CVEHistoryEndpoint: + """CVE Change History API endpoint.""" + + def __init__(self, client: "NVDClient") -> None: + self.client = client + + async def get_cve_history(self, cve_id: str) -> List[CVEChange]: + """Get complete change history for a CVE. + + Args: + cve_id: CVE identifier + + Returns: + List of change events + """ + changes: List[CVEChange] = [] + async for change in self.search_changes(cve_id=cve_id): + changes.append(change) + return changes + + async def search_changes( + self, + cve_id: Optional[str] = None, + change_start_date: Optional[str] = None, + change_end_date: Optional[str] = None, + event_name: Optional[str] = None, + results_per_page: int = 5000, + start_index: int = 0, + ) -> AsyncIterator[CVEChange]: + """Search CVE change history. + + Args: + cve_id: CVE identifier to get history for + change_start_date: Change start date (ISO-8601) + change_end_date: Change end date (ISO-8601) + event_name: Event type (e.g., "Initial Analysis", "CVE Modified") + results_per_page: Results per page (max 5000) + start_index: Starting index for pagination + + Yields: + Change event objects + """ + params = { + "cveId": cve_id, + "changeStartDate": change_start_date, + "changeEndDate": change_end_date, + "eventName": event_name, + "resultsPerPage": results_per_page, + "startIndex": start_index, + } + + current_index = start_index + while True: + params["startIndex"] = current_index + response = await self.client.request( + "GET", + "/cvehistory/2.0", + params=params, + response_model=CVEChangeResponse, + ) + + for change in response.cveChanges: + yield change + + # Check if there are more results + if current_index + response.resultsPerPage >= response.totalResults: + break + + current_index += response.resultsPerPage diff --git a/src/nvd/endpoints/source.py b/src/nvd/endpoints/source.py new file mode 100644 index 0000000..6235c96 --- /dev/null +++ b/src/nvd/endpoints/source.py @@ -0,0 +1,81 @@ +"""Source API endpoint.""" + +from typing import TYPE_CHECKING, AsyncIterator, Optional + +from ..models import SourceData, SourceResponse + +if TYPE_CHECKING: + from ..client import NVDClient + + +class SourceEndpoint: + """Source API endpoint for data source organizations.""" + + def __init__(self, client: "NVDClient") -> None: + self.client = client + + async def get_source(self, source_identifier: str) -> SourceData: + """Get a specific source by identifier. + + Args: + source_identifier: Source identifier + + Returns: + Source data object + """ + response = await self.client.request( + "GET", + "/source/2.0", + params={"sourceIdentifier": source_identifier}, + response_model=SourceResponse, + ) + if not response.sources: + raise ValueError(f"Source {source_identifier} not found") + return response.sources[0] + + async def list_sources( + self, + source_identifier: Optional[str] = None, + last_mod_start_date: Optional[str] = None, + last_mod_end_date: Optional[str] = None, + results_per_page: int = 1000, + start_index: int = 0, + ) -> AsyncIterator[SourceData]: + """List data sources. + + Args: + source_identifier: Filter by specific source identifier + last_mod_start_date: Last modified start date (ISO-8601) + last_mod_end_date: Last modified end date (ISO-8601) + results_per_page: Results per page (max 1000) + start_index: Starting index for pagination + + Yields: + Source data objects + """ + params = { + "sourceIdentifier": source_identifier, + "lastModStartDate": last_mod_start_date, + "lastModEndDate": last_mod_end_date, + "resultsPerPage": results_per_page, + "startIndex": start_index, + } + + current_index = start_index + while True: + params["startIndex"] = current_index + response = await self.client.request( + "GET", + "/source/2.0", + params=params, + response_model=SourceResponse, + ) + + for source in response.sources: + yield source + + # Check if there are more results + if current_index + response.resultsPerPage >= response.totalResults: + break + + current_index += response.resultsPerPage |
