"""CPE API endpoint.""" from typing import TYPE_CHECKING, AsyncIterator, Optional from ..models import CPEData, CPEResponse if TYPE_CHECKING: from ..client import NVDClient class CPEEndpoint: """CPE (Common Platform Enumeration) API endpoint.""" def __init__(self, client: "NVDClient") -> None: self.client = client async def get_cpe(self, cpe_name_id: str) -> CPEData: """Get a specific CPE by UUID. Args: cpe_name_id: CPE Name UUID Returns: CPE data object """ response = await self.client.request( "GET", "/cpes/2.0", params={"cpeNameId": cpe_name_id}, response_model=CPEResponse, ) if not response.products: raise ValueError(f"CPE {cpe_name_id} not found") return response.products[0].cpe async def search_cpes( self, cpe_name_id: Optional[str] = None, cpe_match_string: Optional[str] = None, keyword_search: Optional[str] = None, keyword_exact_match: Optional[bool] = None, last_mod_start_date: Optional[str] = None, last_mod_end_date: Optional[str] = None, match_criteria_id: Optional[str] = None, results_per_page: int = 10000, start_index: int = 0, ) -> AsyncIterator[CPEData]: """Search for CPEs. Args: cpe_name_id: CPE Name UUID cpe_match_string: CPE match string pattern keyword_search: Keyword to search in titles and references keyword_exact_match: Require exact keyword match last_mod_start_date: Last modified start date (ISO-8601) last_mod_end_date: Last modified end date (ISO-8601) match_criteria_id: Match criteria UUID results_per_page: Results per page (max 10000) start_index: Starting index for pagination Yields: CPE data objects """ params = { "cpeNameId": cpe_name_id, "cpeMatchString": cpe_match_string, "keywordSearch": keyword_search, "keywordExactMatch": keyword_exact_match, "lastModStartDate": last_mod_start_date, "lastModEndDate": last_mod_end_date, "matchCriteriaId": match_criteria_id, "resultsPerPage": results_per_page, "startIndex": start_index, } current_index = start_index while True: params["startIndex"] = current_index response = await self.client.request( "GET", "/cpes/2.0", params=params, response_model=CPEResponse, ) for item in response.products: yield item.cpe # Check if there are more results if current_index + response.resultsPerPage >= response.totalResults: break current_index += response.resultsPerPage