diff options
| author | Louis Burda <dev@sinitax.com> | 2026-01-30 03:04:01 +0100 |
|---|---|---|
| committer | Louis Burda <dev@sinitax.com> | 2026-01-30 03:04:01 +0100 |
| commit | f6487c615cff023db1574e2c23db78bf02a43709 (patch) | |
| tree | 8a0e793a8ea28b2a5eef5dcd509b6c6a2466ee1c /src/nvd/endpoints/cve_history.py | |
| download | nvdb-py-main.tar.gz nvdb-py-main.zip | |
Diffstat (limited to 'src/nvd/endpoints/cve_history.py')
| -rw-r--r-- | src/nvd/endpoints/cve_history.py | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/src/nvd/endpoints/cve_history.py b/src/nvd/endpoints/cve_history.py new file mode 100644 index 0000000..c20670d --- /dev/null +++ b/src/nvd/endpoints/cve_history.py @@ -0,0 +1,79 @@ +"""CVE Change History API endpoint.""" + +from typing import TYPE_CHECKING, AsyncIterator, List, Optional + +from ..models import CVEChange, CVEChangeResponse + +if TYPE_CHECKING: + from ..client import NVDClient + + +class CVEHistoryEndpoint: + """CVE Change History API endpoint.""" + + def __init__(self, client: "NVDClient") -> None: + self.client = client + + async def get_cve_history(self, cve_id: str) -> List[CVEChange]: + """Get complete change history for a CVE. + + Args: + cve_id: CVE identifier + + Returns: + List of change events + """ + changes: List[CVEChange] = [] + async for change in self.search_changes(cve_id=cve_id): + changes.append(change) + return changes + + async def search_changes( + self, + cve_id: Optional[str] = None, + change_start_date: Optional[str] = None, + change_end_date: Optional[str] = None, + event_name: Optional[str] = None, + results_per_page: int = 5000, + start_index: int = 0, + ) -> AsyncIterator[CVEChange]: + """Search CVE change history. + + Args: + cve_id: CVE identifier to get history for + change_start_date: Change start date (ISO-8601) + change_end_date: Change end date (ISO-8601) + event_name: Event type (e.g., "Initial Analysis", "CVE Modified") + results_per_page: Results per page (max 5000) + start_index: Starting index for pagination + + Yields: + Change event objects + """ + params = { + "cveId": cve_id, + "changeStartDate": change_start_date, + "changeEndDate": change_end_date, + "eventName": event_name, + "resultsPerPage": results_per_page, + "startIndex": start_index, + } + + current_index = start_index + while True: + params["startIndex"] = current_index + response = await self.client.request( + "GET", + "/cvehistory/2.0", + params=params, + response_model=CVEChangeResponse, + ) + + for change in response.cveChanges: + yield change + + # Check if there are more results + if current_index + response.resultsPerPage >= response.totalResults: + break + + current_index += response.resultsPerPage |
