diff options
Diffstat (limited to 'src/nvd/endpoints/cpe.py')
| -rw-r--r-- | src/nvd/endpoints/cpe.py | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/src/nvd/endpoints/cpe.py b/src/nvd/endpoints/cpe.py new file mode 100644 index 0000000..9edd23a --- /dev/null +++ b/src/nvd/endpoints/cpe.py @@ -0,0 +1,93 @@ +"""CPE API endpoint.""" + +from typing import TYPE_CHECKING, AsyncIterator, Optional + +from ..models import CPEData, CPEResponse + +if TYPE_CHECKING: + from ..client import NVDClient + + +class CPEEndpoint: + """CPE (Common Platform Enumeration) API endpoint.""" + + def __init__(self, client: "NVDClient") -> None: + self.client = client + + async def get_cpe(self, cpe_name_id: str) -> CPEData: + """Get a specific CPE by UUID. + + Args: + cpe_name_id: CPE Name UUID + + Returns: + CPE data object + """ + response = await self.client.request( + "GET", + "/cpes/2.0", + params={"cpeNameId": cpe_name_id}, + response_model=CPEResponse, + ) + if not response.products: + raise ValueError(f"CPE {cpe_name_id} not found") + return response.products[0].cpe + + async def search_cpes( + self, + cpe_name_id: Optional[str] = None, + cpe_match_string: Optional[str] = None, + keyword_search: Optional[str] = None, + keyword_exact_match: Optional[bool] = None, + last_mod_start_date: Optional[str] = None, + last_mod_end_date: Optional[str] = None, + match_criteria_id: Optional[str] = None, + results_per_page: int = 10000, + start_index: int = 0, + ) -> AsyncIterator[CPEData]: + """Search for CPEs. + + Args: + cpe_name_id: CPE Name UUID + cpe_match_string: CPE match string pattern + keyword_search: Keyword to search in titles and references + keyword_exact_match: Require exact keyword match + last_mod_start_date: Last modified start date (ISO-8601) + last_mod_end_date: Last modified end date (ISO-8601) + match_criteria_id: Match criteria UUID + results_per_page: Results per page (max 10000) + start_index: Starting index for pagination + + Yields: + CPE data objects + """ + params = { + "cpeNameId": cpe_name_id, + "cpeMatchString": cpe_match_string, + "keywordSearch": keyword_search, + "keywordExactMatch": keyword_exact_match, + "lastModStartDate": last_mod_start_date, + "lastModEndDate": last_mod_end_date, + "matchCriteriaId": match_criteria_id, + "resultsPerPage": results_per_page, + "startIndex": start_index, + } + + current_index = start_index + while True: + params["startIndex"] = current_index + response = await self.client.request( + "GET", + "/cpes/2.0", + params=params, + response_model=CPEResponse, + ) + + for item in response.products: + yield item.cpe + + # Check if there are more results + if current_index + response.resultsPerPage >= response.totalResults: + break + + current_index += response.resultsPerPage |
