aboutsummaryrefslogtreecommitdiffstats
path: root/src/nvd/endpoints
diff options
context:
space:
mode:
authorLouis Burda <dev@sinitax.com>2026-01-30 03:04:01 +0100
committerLouis Burda <dev@sinitax.com>2026-01-30 03:04:01 +0100
commitf6487c615cff023db1574e2c23db78bf02a43709 (patch)
tree8a0e793a8ea28b2a5eef5dcd509b6c6a2466ee1c /src/nvd/endpoints
downloadnvdb-py-main.tar.gz
nvdb-py-main.zip
Add initial versionHEADmain
Diffstat (limited to 'src/nvd/endpoints')
-rw-r--r--src/nvd/endpoints/__init__.py1
-rw-r--r--src/nvd/endpoints/cpe.py93
-rw-r--r--src/nvd/endpoints/cpematch.py101
-rw-r--r--src/nvd/endpoints/cve.py205
-rw-r--r--src/nvd/endpoints/cve_history.py79
-rw-r--r--src/nvd/endpoints/source.py81
6 files changed, 560 insertions, 0 deletions
diff --git a/src/nvd/endpoints/__init__.py b/src/nvd/endpoints/__init__.py
new file mode 100644
index 0000000..551a8fe
--- /dev/null
+++ b/src/nvd/endpoints/__init__.py
@@ -0,0 +1 @@
+"""NVD API endpoints."""
diff --git a/src/nvd/endpoints/cpe.py b/src/nvd/endpoints/cpe.py
new file mode 100644
index 0000000..9edd23a
--- /dev/null
+++ b/src/nvd/endpoints/cpe.py
@@ -0,0 +1,93 @@
+"""CPE API endpoint."""
+
+from typing import TYPE_CHECKING, AsyncIterator, Optional
+
+from ..models import CPEData, CPEResponse
+
+if TYPE_CHECKING:
+ from ..client import NVDClient
+
+
+class CPEEndpoint:
+ """CPE (Common Platform Enumeration) API endpoint."""
+
+ def __init__(self, client: "NVDClient") -> None:
+ self.client = client
+
+ async def get_cpe(self, cpe_name_id: str) -> CPEData:
+ """Get a specific CPE by UUID.
+
+ Args:
+ cpe_name_id: CPE Name UUID
+
+ Returns:
+ CPE data object
+ """
+ response = await self.client.request(
+ "GET",
+ "/cpes/2.0",
+ params={"cpeNameId": cpe_name_id},
+ response_model=CPEResponse,
+ )
+ if not response.products:
+ raise ValueError(f"CPE {cpe_name_id} not found")
+ return response.products[0].cpe
+
+ async def search_cpes(
+ self,
+ cpe_name_id: Optional[str] = None,
+ cpe_match_string: Optional[str] = None,
+ keyword_search: Optional[str] = None,
+ keyword_exact_match: Optional[bool] = None,
+ last_mod_start_date: Optional[str] = None,
+ last_mod_end_date: Optional[str] = None,
+ match_criteria_id: Optional[str] = None,
+ results_per_page: int = 10000,
+ start_index: int = 0,
+ ) -> AsyncIterator[CPEData]:
+ """Search for CPEs.
+
+ Args:
+ cpe_name_id: CPE Name UUID
+ cpe_match_string: CPE match string pattern
+ keyword_search: Keyword to search in titles and references
+ keyword_exact_match: Require exact keyword match
+ last_mod_start_date: Last modified start date (ISO-8601)
+ last_mod_end_date: Last modified end date (ISO-8601)
+ match_criteria_id: Match criteria UUID
+ results_per_page: Results per page (max 10000)
+ start_index: Starting index for pagination
+
+ Yields:
+ CPE data objects
+ """
+ params = {
+ "cpeNameId": cpe_name_id,
+ "cpeMatchString": cpe_match_string,
+ "keywordSearch": keyword_search,
+ "keywordExactMatch": keyword_exact_match,
+ "lastModStartDate": last_mod_start_date,
+ "lastModEndDate": last_mod_end_date,
+ "matchCriteriaId": match_criteria_id,
+ "resultsPerPage": results_per_page,
+ "startIndex": start_index,
+ }
+
+ current_index = start_index
+ while True:
+ params["startIndex"] = current_index
+ response = await self.client.request(
+ "GET",
+ "/cpes/2.0",
+ params=params,
+ response_model=CPEResponse,
+ )
+
+ for item in response.products:
+ yield item.cpe
+
+ # Check if there are more results
+ if current_index + response.resultsPerPage >= response.totalResults:
+ break
+
+ current_index += response.resultsPerPage
diff --git a/src/nvd/endpoints/cpematch.py b/src/nvd/endpoints/cpematch.py
new file mode 100644
index 0000000..0575859
--- /dev/null
+++ b/src/nvd/endpoints/cpematch.py
@@ -0,0 +1,101 @@
+"""CPE Match Criteria API endpoint."""
+
+from typing import TYPE_CHECKING, AsyncIterator, List, Optional
+
+from ..models import CPEMatchResponse, CPEMatchString
+
+if TYPE_CHECKING:
+ from ..client import NVDClient
+
+
+class CPEMatchEndpoint:
+ """CPE Match Criteria API endpoint."""
+
+ def __init__(self, client: "NVDClient") -> None:
+ self.client = client
+
+ async def get_match_criteria(self, match_criteria_id: str) -> CPEMatchString:
+ """Get specific match criteria by UUID.
+
+ Args:
+ match_criteria_id: Match criteria UUID
+
+ Returns:
+ Match criteria object
+ """
+ response = await self.client.request(
+ "GET",
+ "/cpematch/2.0",
+ params={"matchCriteriaId": match_criteria_id},
+ response_model=CPEMatchResponse,
+ )
+ if not response.matchStrings:
+ raise ValueError(f"Match criteria {match_criteria_id} not found")
+ return response.matchStrings[0]
+
+ async def get_cve_match_criteria(self, cve_id: str) -> List[CPEMatchString]:
+ """Get all match criteria for a specific CVE.
+
+ Args:
+ cve_id: CVE identifier
+
+ Returns:
+ List of match criteria objects
+ """
+ results: List[CPEMatchString] = []
+ async for match in self.search_match_criteria(cve_id=cve_id):
+ results.append(match)
+ return results
+
+ async def search_match_criteria(
+ self,
+ cve_id: Optional[str] = None,
+ match_criteria_id: Optional[str] = None,
+ match_string_search: Optional[str] = None,
+ last_mod_start_date: Optional[str] = None,
+ last_mod_end_date: Optional[str] = None,
+ results_per_page: int = 500,
+ start_index: int = 0,
+ ) -> AsyncIterator[CPEMatchString]:
+ """Search for CPE match criteria.
+
+ Args:
+ cve_id: CVE identifier to get match strings for
+ match_criteria_id: Specific match criteria UUID
+ match_string_search: Match string pattern to search
+ last_mod_start_date: Last modified start date (ISO-8601)
+ last_mod_end_date: Last modified end date (ISO-8601)
+ results_per_page: Results per page (max 500)
+ start_index: Starting index for pagination
+
+ Yields:
+ Match criteria objects
+ """
+ params = {
+ "cveId": cve_id,
+ "matchCriteriaId": match_criteria_id,
+ "matchStringSearch": match_string_search,
+ "lastModStartDate": last_mod_start_date,
+ "lastModEndDate": last_mod_end_date,
+ "resultsPerPage": results_per_page,
+ "startIndex": start_index,
+ }
+
+ current_index = start_index
+ while True:
+ params["startIndex"] = current_index
+ response = await self.client.request(
+ "GET",
+ "/cpematch/2.0",
+ params=params,
+ response_model=CPEMatchResponse,
+ )
+
+ for match_string in response.matchStrings:
+ yield match_string
+
+ # Check if there are more results
+ if current_index + response.resultsPerPage >= response.totalResults:
+ break
+
+ current_index += response.resultsPerPage
diff --git a/src/nvd/endpoints/cve.py b/src/nvd/endpoints/cve.py
new file mode 100644
index 0000000..5029f59
--- /dev/null
+++ b/src/nvd/endpoints/cve.py
@@ -0,0 +1,205 @@
+"""CVE API endpoint."""
+
+from datetime import datetime
+from typing import TYPE_CHECKING, AsyncIterator, Optional
+
+from ..models import CVEData, CVEResponse
+
+if TYPE_CHECKING:
+ from ..client import NVDClient
+
+
+class CVEEndpoint:
+ """CVE API endpoint with full parameter support."""
+
+ def __init__(self, client: "NVDClient") -> None:
+ self.client = client
+
+ async def get_cve(self, cve_id: str) -> CVEData:
+ """Get a specific CVE by ID.
+
+ Args:
+ cve_id: CVE identifier (e.g., "CVE-2021-44228")
+
+ Returns:
+ CVE data object
+ """
+ response = await self.client.request(
+ "GET",
+ "/cves/2.0",
+ params={"cveId": cve_id},
+ response_model=CVEResponse,
+ )
+ if not response.vulnerabilities:
+ raise ValueError(f"CVE {cve_id} not found")
+ return response.vulnerabilities[0].cve
+
+ async def search_cves(
+ self,
+ # CVE identification
+ cve_id: Optional[str] = None,
+ # CPE filtering
+ cpe_name: Optional[str] = None,
+ virtual_match_string: Optional[str] = None,
+ # Date ranges (ISO-8601 format, max 120 days)
+ pub_start_date: Optional[str] = None,
+ pub_end_date: Optional[str] = None,
+ last_mod_start_date: Optional[str] = None,
+ last_mod_end_date: Optional[str] = None,
+ kev_start_date: Optional[str] = None,
+ kev_end_date: Optional[str] = None,
+ # CVSS v2 filtering
+ cvss_v2_severity: Optional[str] = None, # LOW, MEDIUM, HIGH
+ cvss_v2_metrics: Optional[str] = None,
+ # CVSS v3 filtering
+ cvss_v3_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL
+ cvss_v3_metrics: Optional[str] = None,
+ # CVSS v4 filtering
+ cvss_v4_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL
+ cvss_v4_metrics: Optional[str] = None,
+ # CWE filtering
+ cwe_id: Optional[str] = None, # e.g., "CWE-79"
+ # Boolean filters
+ has_cert_alerts: Optional[bool] = None,
+ has_cert_notes: Optional[bool] = None,
+ has_kev: Optional[bool] = None,
+ has_oval: Optional[bool] = None,
+ is_vulnerable: Optional[bool] = None,
+ no_rejected: Optional[bool] = None,
+ # Keyword search
+ keyword_search: Optional[str] = None,
+ keyword_exact_match: Optional[bool] = None,
+ # Source
+ source_identifier: Optional[str] = None,
+ # Version filtering (requires cpe_name)
+ version_start: Optional[str] = None,
+ version_start_type: Optional[str] = None, # "including" or "excluding"
+ version_end: Optional[str] = None,
+ version_end_type: Optional[str] = None, # "including" or "excluding"
+ # Pagination
+ results_per_page: int = 2000,
+ start_index: int = 0,
+ ) -> AsyncIterator[CVEData]:
+ """Search for CVEs with extensive filtering options.
+
+ Args:
+ cve_id: Specific CVE identifier
+ cpe_name: CPE 2.3 name
+ virtual_match_string: Virtual CPE match string
+ pub_start_date: Publication start date (ISO-8601)
+ pub_end_date: Publication end date (ISO-8601)
+ last_mod_start_date: Last modified start date (ISO-8601)
+ last_mod_end_date: Last modified end date (ISO-8601)
+ kev_start_date: KEV catalog start date (ISO-8601)
+ kev_end_date: KEV catalog end date (ISO-8601)
+ cvss_v2_severity: CVSS v2 severity (LOW, MEDIUM, HIGH)
+ cvss_v2_metrics: CVSS v2 vector string
+ cvss_v3_severity: CVSS v3 severity (LOW, MEDIUM, HIGH, CRITICAL)
+ cvss_v3_metrics: CVSS v3 vector string
+ cvss_v4_severity: CVSS v4 severity (LOW, MEDIUM, HIGH, CRITICAL)
+ cvss_v4_metrics: CVSS v4 vector string
+ cwe_id: CWE identifier (e.g., "CWE-79")
+ has_cert_alerts: Filter for CERT alerts
+ has_cert_notes: Filter for CERT notes
+ has_kev: Filter for CISA KEV catalog entries
+ has_oval: Filter for OVAL records
+ is_vulnerable: Filter for vulnerable CPE configurations
+ no_rejected: Exclude rejected CVEs
+ keyword_search: Keyword to search in descriptions
+ keyword_exact_match: Require exact keyword match
+ source_identifier: Data source identifier
+ version_start: Start version for CPE filtering
+ version_start_type: "including" or "excluding"
+ version_end: End version for CPE filtering
+ version_end_type: "including" or "excluding"
+ results_per_page: Results per page (max 2000)
+ start_index: Starting index for pagination
+
+ Yields:
+ CVE data objects
+ """
+ params = {
+ "cveId": cve_id,
+ "cpeName": cpe_name,
+ "virtualMatchString": virtual_match_string,
+ "pubStartDate": pub_start_date,
+ "pubEndDate": pub_end_date,
+ "lastModStartDate": last_mod_start_date,
+ "lastModEndDate": last_mod_end_date,
+ "kevStartDate": kev_start_date,
+ "kevEndDate": kev_end_date,
+ "cvssV2Severity": cvss_v2_severity,
+ "cvssV2Metrics": cvss_v2_metrics,
+ "cvssV3Severity": cvss_v3_severity,
+ "cvssV3Metrics": cvss_v3_metrics,
+ "cvssV4Severity": cvss_v4_severity,
+ "cvssV4Metrics": cvss_v4_metrics,
+ "cweId": cwe_id,
+ "hasCertAlerts": has_cert_alerts,
+ "hasCertNotes": has_cert_notes,
+ "hasKev": has_kev,
+ "hasOval": has_oval,
+ "isVulnerable": is_vulnerable,
+ "noRejected": no_rejected,
+ "keywordSearch": keyword_search,
+ "keywordExactMatch": keyword_exact_match,
+ "sourceIdentifier": source_identifier,
+ "versionStart": version_start,
+ "versionStartType": version_start_type,
+ "versionEnd": version_end,
+ "versionEndType": version_end_type,
+ "resultsPerPage": results_per_page,
+ "startIndex": start_index,
+ }
+
+ current_index = start_index
+ while True:
+ params["startIndex"] = current_index
+ response = await self.client.request(
+ "GET",
+ "/cves/2.0",
+ params=params,
+ response_model=CVEResponse,
+ )
+
+ for item in response.vulnerabilities:
+ yield item.cve
+
+ # Check if there are more results
+ if current_index + response.resultsPerPage >= response.totalResults:
+ break
+
+ current_index += response.resultsPerPage
+
+ async def get_cves_by_cpe(
+ self, cpe_name: str, **kwargs: object
+ ) -> AsyncIterator[CVEData]:
+ """Get CVEs for a specific CPE.
+
+ Args:
+ cpe_name: CPE 2.3 name
+ **kwargs: Additional search parameters
+
+ Yields:
+ CVE data objects
+ """
+ async for cve in self.search_cves(cpe_name=cpe_name, **kwargs):
+ yield cve
+
+ async def get_cves_by_keyword(
+ self, keyword: str, exact_match: bool = False, **kwargs: object
+ ) -> AsyncIterator[CVEData]:
+ """Search CVEs by keyword.
+
+ Args:
+ keyword: Keyword to search
+ exact_match: Require exact match
+ **kwargs: Additional search parameters
+
+ Yields:
+ CVE data objects
+ """
+ async for cve in self.search_cves(
+ keyword_search=keyword, keyword_exact_match=exact_match, **kwargs
+ ):
+ yield cve
diff --git a/src/nvd/endpoints/cve_history.py b/src/nvd/endpoints/cve_history.py
new file mode 100644
index 0000000..c20670d
--- /dev/null
+++ b/src/nvd/endpoints/cve_history.py
@@ -0,0 +1,79 @@
+"""CVE Change History API endpoint."""
+
+from typing import TYPE_CHECKING, AsyncIterator, List, Optional
+
+from ..models import CVEChange, CVEChangeResponse
+
+if TYPE_CHECKING:
+ from ..client import NVDClient
+
+
+class CVEHistoryEndpoint:
+ """CVE Change History API endpoint."""
+
+ def __init__(self, client: "NVDClient") -> None:
+ self.client = client
+
+ async def get_cve_history(self, cve_id: str) -> List[CVEChange]:
+ """Get complete change history for a CVE.
+
+ Args:
+ cve_id: CVE identifier
+
+ Returns:
+ List of change events
+ """
+ changes: List[CVEChange] = []
+ async for change in self.search_changes(cve_id=cve_id):
+ changes.append(change)
+ return changes
+
+ async def search_changes(
+ self,
+ cve_id: Optional[str] = None,
+ change_start_date: Optional[str] = None,
+ change_end_date: Optional[str] = None,
+ event_name: Optional[str] = None,
+ results_per_page: int = 5000,
+ start_index: int = 0,
+ ) -> AsyncIterator[CVEChange]:
+ """Search CVE change history.
+
+ Args:
+ cve_id: CVE identifier to get history for
+ change_start_date: Change start date (ISO-8601)
+ change_end_date: Change end date (ISO-8601)
+ event_name: Event type (e.g., "Initial Analysis", "CVE Modified")
+ results_per_page: Results per page (max 5000)
+ start_index: Starting index for pagination
+
+ Yields:
+ Change event objects
+ """
+ params = {
+ "cveId": cve_id,
+ "changeStartDate": change_start_date,
+ "changeEndDate": change_end_date,
+ "eventName": event_name,
+ "resultsPerPage": results_per_page,
+ "startIndex": start_index,
+ }
+
+ current_index = start_index
+ while True:
+ params["startIndex"] = current_index
+ response = await self.client.request(
+ "GET",
+ "/cvehistory/2.0",
+ params=params,
+ response_model=CVEChangeResponse,
+ )
+
+ for change in response.cveChanges:
+ yield change
+
+ # Check if there are more results
+ if current_index + response.resultsPerPage >= response.totalResults:
+ break
+
+ current_index += response.resultsPerPage
diff --git a/src/nvd/endpoints/source.py b/src/nvd/endpoints/source.py
new file mode 100644
index 0000000..6235c96
--- /dev/null
+++ b/src/nvd/endpoints/source.py
@@ -0,0 +1,81 @@
+"""Source API endpoint."""
+
+from typing import TYPE_CHECKING, AsyncIterator, Optional
+
+from ..models import SourceData, SourceResponse
+
+if TYPE_CHECKING:
+ from ..client import NVDClient
+
+
+class SourceEndpoint:
+ """Source API endpoint for data source organizations."""
+
+ def __init__(self, client: "NVDClient") -> None:
+ self.client = client
+
+ async def get_source(self, source_identifier: str) -> SourceData:
+ """Get a specific source by identifier.
+
+ Args:
+ source_identifier: Source identifier
+
+ Returns:
+ Source data object
+ """
+ response = await self.client.request(
+ "GET",
+ "/source/2.0",
+ params={"sourceIdentifier": source_identifier},
+ response_model=SourceResponse,
+ )
+ if not response.sources:
+ raise ValueError(f"Source {source_identifier} not found")
+ return response.sources[0]
+
+ async def list_sources(
+ self,
+ source_identifier: Optional[str] = None,
+ last_mod_start_date: Optional[str] = None,
+ last_mod_end_date: Optional[str] = None,
+ results_per_page: int = 1000,
+ start_index: int = 0,
+ ) -> AsyncIterator[SourceData]:
+ """List data sources.
+
+ Args:
+ source_identifier: Filter by specific source identifier
+ last_mod_start_date: Last modified start date (ISO-8601)
+ last_mod_end_date: Last modified end date (ISO-8601)
+ results_per_page: Results per page (max 1000)
+ start_index: Starting index for pagination
+
+ Yields:
+ Source data objects
+ """
+ params = {
+ "sourceIdentifier": source_identifier,
+ "lastModStartDate": last_mod_start_date,
+ "lastModEndDate": last_mod_end_date,
+ "resultsPerPage": results_per_page,
+ "startIndex": start_index,
+ }
+
+ current_index = start_index
+ while True:
+ params["startIndex"] = current_index
+ response = await self.client.request(
+ "GET",
+ "/source/2.0",
+ params=params,
+ response_model=SourceResponse,
+ )
+
+ for source in response.sources:
+ yield source
+
+ # Check if there are more results
+ if current_index + response.resultsPerPage >= response.totalResults:
+ break
+
+ current_index += response.resultsPerPage