"""CPE Match Criteria API endpoint.""" from typing import TYPE_CHECKING, AsyncIterator, List, Optional from ..models import CPEMatchResponse, CPEMatchString if TYPE_CHECKING: from ..client import NVDClient class CPEMatchEndpoint: """CPE Match Criteria API endpoint.""" def __init__(self, client: "NVDClient") -> None: self.client = client async def get_match_criteria(self, match_criteria_id: str) -> CPEMatchString: """Get specific match criteria by UUID. Args: match_criteria_id: Match criteria UUID Returns: Match criteria object """ response = await self.client.request( "GET", "/cpematch/2.0", params={"matchCriteriaId": match_criteria_id}, response_model=CPEMatchResponse, ) if not response.matchStrings: raise ValueError(f"Match criteria {match_criteria_id} not found") return response.matchStrings[0] async def get_cve_match_criteria(self, cve_id: str) -> List[CPEMatchString]: """Get all match criteria for a specific CVE. Args: cve_id: CVE identifier Returns: List of match criteria objects """ results: List[CPEMatchString] = [] async for match in self.search_match_criteria(cve_id=cve_id): results.append(match) return results async def search_match_criteria( self, cve_id: Optional[str] = None, match_criteria_id: Optional[str] = None, match_string_search: Optional[str] = None, last_mod_start_date: Optional[str] = None, last_mod_end_date: Optional[str] = None, results_per_page: int = 500, start_index: int = 0, ) -> AsyncIterator[CPEMatchString]: """Search for CPE match criteria. Args: cve_id: CVE identifier to get match strings for match_criteria_id: Specific match criteria UUID match_string_search: Match string pattern to search last_mod_start_date: Last modified start date (ISO-8601) last_mod_end_date: Last modified end date (ISO-8601) results_per_page: Results per page (max 500) start_index: Starting index for pagination Yields: Match criteria objects """ params = { "cveId": cve_id, "matchCriteriaId": match_criteria_id, "matchStringSearch": match_string_search, "lastModStartDate": last_mod_start_date, "lastModEndDate": last_mod_end_date, "resultsPerPage": results_per_page, "startIndex": start_index, } current_index = start_index while True: params["startIndex"] = current_index response = await self.client.request( "GET", "/cpematch/2.0", params=params, response_model=CPEMatchResponse, ) for match_string in response.matchStrings: yield match_string # Check if there are more results if current_index + response.resultsPerPage >= response.totalResults: break current_index += response.resultsPerPage