1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
"""CVE API endpoint."""
from datetime import datetime
from typing import TYPE_CHECKING, AsyncIterator, Optional
from ..models import CVEData, CVEResponse
if TYPE_CHECKING:
from ..client import NVDClient
class CVEEndpoint:
"""CVE API endpoint with full parameter support."""
def __init__(self, client: "NVDClient") -> None:
self.client = client
async def get_cve(self, cve_id: str) -> CVEData:
"""Get a specific CVE by ID.
Args:
cve_id: CVE identifier (e.g., "CVE-2021-44228")
Returns:
CVE data object
"""
response = await self.client.request(
"GET",
"/cves/2.0",
params={"cveId": cve_id},
response_model=CVEResponse,
)
if not response.vulnerabilities:
raise ValueError(f"CVE {cve_id} not found")
return response.vulnerabilities[0].cve
async def search_cves(
self,
# CVE identification
cve_id: Optional[str] = None,
# CPE filtering
cpe_name: Optional[str] = None,
virtual_match_string: Optional[str] = None,
# Date ranges (ISO-8601 format, max 120 days)
pub_start_date: Optional[str] = None,
pub_end_date: Optional[str] = None,
last_mod_start_date: Optional[str] = None,
last_mod_end_date: Optional[str] = None,
kev_start_date: Optional[str] = None,
kev_end_date: Optional[str] = None,
# CVSS v2 filtering
cvss_v2_severity: Optional[str] = None, # LOW, MEDIUM, HIGH
cvss_v2_metrics: Optional[str] = None,
# CVSS v3 filtering
cvss_v3_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL
cvss_v3_metrics: Optional[str] = None,
# CVSS v4 filtering
cvss_v4_severity: Optional[str] = None, # LOW, MEDIUM, HIGH, CRITICAL
cvss_v4_metrics: Optional[str] = None,
# CWE filtering
cwe_id: Optional[str] = None, # e.g., "CWE-79"
# Boolean filters
has_cert_alerts: Optional[bool] = None,
has_cert_notes: Optional[bool] = None,
has_kev: Optional[bool] = None,
has_oval: Optional[bool] = None,
is_vulnerable: Optional[bool] = None,
no_rejected: Optional[bool] = None,
# Keyword search
keyword_search: Optional[str] = None,
keyword_exact_match: Optional[bool] = None,
# Source
source_identifier: Optional[str] = None,
# Version filtering (requires cpe_name)
version_start: Optional[str] = None,
version_start_type: Optional[str] = None, # "including" or "excluding"
version_end: Optional[str] = None,
version_end_type: Optional[str] = None, # "including" or "excluding"
# Pagination
results_per_page: int = 2000,
start_index: int = 0,
) -> AsyncIterator[CVEData]:
"""Search for CVEs with extensive filtering options.
Args:
cve_id: Specific CVE identifier
cpe_name: CPE 2.3 name
virtual_match_string: Virtual CPE match string
pub_start_date: Publication start date (ISO-8601)
pub_end_date: Publication end date (ISO-8601)
last_mod_start_date: Last modified start date (ISO-8601)
last_mod_end_date: Last modified end date (ISO-8601)
kev_start_date: KEV catalog start date (ISO-8601)
kev_end_date: KEV catalog end date (ISO-8601)
cvss_v2_severity: CVSS v2 severity (LOW, MEDIUM, HIGH)
cvss_v2_metrics: CVSS v2 vector string
cvss_v3_severity: CVSS v3 severity (LOW, MEDIUM, HIGH, CRITICAL)
cvss_v3_metrics: CVSS v3 vector string
cvss_v4_severity: CVSS v4 severity (LOW, MEDIUM, HIGH, CRITICAL)
cvss_v4_metrics: CVSS v4 vector string
cwe_id: CWE identifier (e.g., "CWE-79")
has_cert_alerts: Filter for CERT alerts
has_cert_notes: Filter for CERT notes
has_kev: Filter for CISA KEV catalog entries
has_oval: Filter for OVAL records
is_vulnerable: Filter for vulnerable CPE configurations
no_rejected: Exclude rejected CVEs
keyword_search: Keyword to search in descriptions
keyword_exact_match: Require exact keyword match
source_identifier: Data source identifier
version_start: Start version for CPE filtering
version_start_type: "including" or "excluding"
version_end: End version for CPE filtering
version_end_type: "including" or "excluding"
results_per_page: Results per page (max 2000)
start_index: Starting index for pagination
Yields:
CVE data objects
"""
params = {
"cveId": cve_id,
"cpeName": cpe_name,
"virtualMatchString": virtual_match_string,
"pubStartDate": pub_start_date,
"pubEndDate": pub_end_date,
"lastModStartDate": last_mod_start_date,
"lastModEndDate": last_mod_end_date,
"kevStartDate": kev_start_date,
"kevEndDate": kev_end_date,
"cvssV2Severity": cvss_v2_severity,
"cvssV2Metrics": cvss_v2_metrics,
"cvssV3Severity": cvss_v3_severity,
"cvssV3Metrics": cvss_v3_metrics,
"cvssV4Severity": cvss_v4_severity,
"cvssV4Metrics": cvss_v4_metrics,
"cweId": cwe_id,
"hasCertAlerts": has_cert_alerts,
"hasCertNotes": has_cert_notes,
"hasKev": has_kev,
"hasOval": has_oval,
"isVulnerable": is_vulnerable,
"noRejected": no_rejected,
"keywordSearch": keyword_search,
"keywordExactMatch": keyword_exact_match,
"sourceIdentifier": source_identifier,
"versionStart": version_start,
"versionStartType": version_start_type,
"versionEnd": version_end,
"versionEndType": version_end_type,
"resultsPerPage": results_per_page,
"startIndex": start_index,
}
current_index = start_index
while True:
params["startIndex"] = current_index
response = await self.client.request(
"GET",
"/cves/2.0",
params=params,
response_model=CVEResponse,
)
for item in response.vulnerabilities:
yield item.cve
# Check if there are more results
if current_index + response.resultsPerPage >= response.totalResults:
break
current_index += response.resultsPerPage
async def get_cves_by_cpe(
self, cpe_name: str, **kwargs: object
) -> AsyncIterator[CVEData]:
"""Get CVEs for a specific CPE.
Args:
cpe_name: CPE 2.3 name
**kwargs: Additional search parameters
Yields:
CVE data objects
"""
async for cve in self.search_cves(cpe_name=cpe_name, **kwargs):
yield cve
async def get_cves_by_keyword(
self, keyword: str, exact_match: bool = False, **kwargs: object
) -> AsyncIterator[CVEData]:
"""Search CVEs by keyword.
Args:
keyword: Keyword to search
exact_match: Require exact match
**kwargs: Additional search parameters
Yields:
CVE data objects
"""
async for cve in self.search_cves(
keyword_search=keyword, keyword_exact_match=exact_match, **kwargs
):
yield cve
|