aboutsummaryrefslogtreecommitdiffstats
path: root/src/nvd/endpoints/cve.py
blob: 5029f59daeeb12c2196e6fcbc0c50e7d7649475e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""CVE API endpoint."""

from datetime import datetime
from typing import TYPE_CHECKING, AsyncIterator, Optional

from ..models import CVEData, CVEResponse

if TYPE_CHECKING:
    from ..client import NVDClient


class CVEEndpoint:
    """CVE API endpoint with full parameter support."""

    def __init__(self, client: "NVDClient") -> None:
        self.client = client

    async def get_cve(self, cve_id: str) -> CVEData:
        """Get a specific CVE by ID.

        Args:
            cve_id: CVE identifier (e.g., "CVE-2021-44228")

        Returns:
            CVE data object
        """
        response = await self.client.request(
            "GET",
            "/cves/2.0",
            params={"cveId": cve_id},
            response_model=CVEResponse,
        )
        if not response.vulnerabilities:
            raise ValueError(f"CVE {cve_id} not found")
        return response.vulnerabilities[0].cve

    async def search_cves(
        self,
        # CVE identification
        cve_id: Optional[str] = None,
        # CPE filtering
        cpe_name: Optional[str] = None,
        virtual_match_string: Optional[str] = None,
        # Date ranges (ISO-8601 format, max 120 days)
        pub_start_date: Optional[str] = None,
        pub_end_date: Optional[str] = None,
        last_mod_start_date: Optional[str] = None,
        last_mod_end_date: Optional[str] = None,
        kev_start_date: Optional[str] = None,
        kev_end_date: Optional[str] = None,
        # CVSS v2 filtering
        cvss_v2_severity: Optional[str] = None,  # LOW, MEDIUM, HIGH
        cvss_v2_metrics: Optional[str] = None,
        # CVSS v3 filtering
        cvss_v3_severity: Optional[str] = None,  # LOW, MEDIUM, HIGH, CRITICAL
        cvss_v3_metrics: Optional[str] = None,
        # CVSS v4 filtering
        cvss_v4_severity: Optional[str] = None,  # LOW, MEDIUM, HIGH, CRITICAL
        cvss_v4_metrics: Optional[str] = None,
        # CWE filtering
        cwe_id: Optional[str] = None,  # e.g., "CWE-79"
        # Boolean filters
        has_cert_alerts: Optional[bool] = None,
        has_cert_notes: Optional[bool] = None,
        has_kev: Optional[bool] = None,
        has_oval: Optional[bool] = None,
        is_vulnerable: Optional[bool] = None,
        no_rejected: Optional[bool] = None,
        # Keyword search
        keyword_search: Optional[str] = None,
        keyword_exact_match: Optional[bool] = None,
        # Source
        source_identifier: Optional[str] = None,
        # Version filtering (requires cpe_name)
        version_start: Optional[str] = None,
        version_start_type: Optional[str] = None,  # "including" or "excluding"
        version_end: Optional[str] = None,
        version_end_type: Optional[str] = None,  # "including" or "excluding"
        # Pagination
        results_per_page: int = 2000,
        start_index: int = 0,
    ) -> AsyncIterator[CVEData]:
        """Search for CVEs with extensive filtering options.

        Args:
            cve_id: Specific CVE identifier
            cpe_name: CPE 2.3 name
            virtual_match_string: Virtual CPE match string
            pub_start_date: Publication start date (ISO-8601)
            pub_end_date: Publication end date (ISO-8601)
            last_mod_start_date: Last modified start date (ISO-8601)
            last_mod_end_date: Last modified end date (ISO-8601)
            kev_start_date: KEV catalog start date (ISO-8601)
            kev_end_date: KEV catalog end date (ISO-8601)
            cvss_v2_severity: CVSS v2 severity (LOW, MEDIUM, HIGH)
            cvss_v2_metrics: CVSS v2 vector string
            cvss_v3_severity: CVSS v3 severity (LOW, MEDIUM, HIGH, CRITICAL)
            cvss_v3_metrics: CVSS v3 vector string
            cvss_v4_severity: CVSS v4 severity (LOW, MEDIUM, HIGH, CRITICAL)
            cvss_v4_metrics: CVSS v4 vector string
            cwe_id: CWE identifier (e.g., "CWE-79")
            has_cert_alerts: Filter for CERT alerts
            has_cert_notes: Filter for CERT notes
            has_kev: Filter for CISA KEV catalog entries
            has_oval: Filter for OVAL records
            is_vulnerable: Filter for vulnerable CPE configurations
            no_rejected: Exclude rejected CVEs
            keyword_search: Keyword to search in descriptions
            keyword_exact_match: Require exact keyword match
            source_identifier: Data source identifier
            version_start: Start version for CPE filtering
            version_start_type: "including" or "excluding"
            version_end: End version for CPE filtering
            version_end_type: "including" or "excluding"
            results_per_page: Results per page (max 2000)
            start_index: Starting index for pagination

        Yields:
            CVE data objects
        """
        params = {
            "cveId": cve_id,
            "cpeName": cpe_name,
            "virtualMatchString": virtual_match_string,
            "pubStartDate": pub_start_date,
            "pubEndDate": pub_end_date,
            "lastModStartDate": last_mod_start_date,
            "lastModEndDate": last_mod_end_date,
            "kevStartDate": kev_start_date,
            "kevEndDate": kev_end_date,
            "cvssV2Severity": cvss_v2_severity,
            "cvssV2Metrics": cvss_v2_metrics,
            "cvssV3Severity": cvss_v3_severity,
            "cvssV3Metrics": cvss_v3_metrics,
            "cvssV4Severity": cvss_v4_severity,
            "cvssV4Metrics": cvss_v4_metrics,
            "cweId": cwe_id,
            "hasCertAlerts": has_cert_alerts,
            "hasCertNotes": has_cert_notes,
            "hasKev": has_kev,
            "hasOval": has_oval,
            "isVulnerable": is_vulnerable,
            "noRejected": no_rejected,
            "keywordSearch": keyword_search,
            "keywordExactMatch": keyword_exact_match,
            "sourceIdentifier": source_identifier,
            "versionStart": version_start,
            "versionStartType": version_start_type,
            "versionEnd": version_end,
            "versionEndType": version_end_type,
            "resultsPerPage": results_per_page,
            "startIndex": start_index,
        }

        current_index = start_index
        while True:
            params["startIndex"] = current_index
            response = await self.client.request(
                "GET",
                "/cves/2.0",
                params=params,
                response_model=CVEResponse,
            )

            for item in response.vulnerabilities:
                yield item.cve

            # Check if there are more results
            if current_index + response.resultsPerPage >= response.totalResults:
                break

            current_index += response.resultsPerPage

    async def get_cves_by_cpe(
        self, cpe_name: str, **kwargs: object
    ) -> AsyncIterator[CVEData]:
        """Get CVEs for a specific CPE.

        Args:
            cpe_name: CPE 2.3 name
            **kwargs: Additional search parameters

        Yields:
            CVE data objects
        """
        async for cve in self.search_cves(cpe_name=cpe_name, **kwargs):
            yield cve

    async def get_cves_by_keyword(
        self, keyword: str, exact_match: bool = False, **kwargs: object
    ) -> AsyncIterator[CVEData]:
        """Search CVEs by keyword.

        Args:
            keyword: Keyword to search
            exact_match: Require exact match
            **kwargs: Additional search parameters

        Yields:
            CVE data objects
        """
        async for cve in self.search_cves(
            keyword_search=keyword, keyword_exact_match=exact_match, **kwargs
        ):
            yield cve