1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
"""CVE CLI commands."""
import asyncio
import sys
from typing import Optional
import typer
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from ...client import NVDClient
from ..formatters import format_cve_table, format_json, format_json_lines, format_yaml
CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]}
app = typer.Typer(
no_args_is_help=True,
help="Query CVE (vulnerability) information from the NVD",
context_settings=CONTEXT_SETTINGS,
epilog="Examples:\n"
" nvdb cve get CVE-2021-44228\n"
" nvdb cve search --keyword 'sql injection' --severity HIGH\n"
" nvdb cve search --has-kev --limit 20\n"
" nvdb cve history CVE-2021-44228",
)
# Console for stderr (progress, errors, warnings)
console = Console(stderr=True)
@app.command("get")
def get_cve(
cve_id: str = typer.Argument(..., help="CVE ID (e.g., CVE-2021-44228)"),
api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"),
output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"),
) -> None:
"""Get details for a specific CVE."""
async def _get() -> None:
async with NVDClient(api_key=api_key) as client:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
progress.add_task(f"Fetching {cve_id}...", total=None)
cve = await client.cve.get_cve(cve_id)
if output_format == "json":
format_json(cve)
elif output_format == "yaml":
format_yaml(cve)
else:
format_cve_table([cve])
asyncio.run(_get())
@app.command("search")
def search_cves(
keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in descriptions"),
cpe: Optional[str] = typer.Option(None, "--cpe", help="CPE name to filter by"),
severity: Optional[str] = typer.Option(None, "--severity", "-s", help="CVSS v3 severity: LOW, MEDIUM, HIGH, CRITICAL"),
cvss_v2_severity: Optional[str] = typer.Option(None, "--cvss-v2-severity", help="CVSS v2 severity"),
cvss_v3_severity: Optional[str] = typer.Option(None, "--cvss-v3-severity", help="CVSS v3 severity"),
cwe_id: Optional[str] = typer.Option(None, "--cwe", help="CWE ID (e.g., CWE-79)"),
has_kev: bool = typer.Option(False, "--has-kev", help="Only CVEs in CISA KEV catalog"),
pub_start: Optional[str] = typer.Option(None, "--pub-start", help="Publication start date (ISO-8601)"),
pub_end: Optional[str] = typer.Option(None, "--pub-end", help="Publication end date (ISO-8601)"),
mod_start: Optional[str] = typer.Option(None, "--mod-start", help="Last modified start date (ISO-8601)"),
mod_end: Optional[str] = typer.Option(None, "--mod-end", help="Last modified end date (ISO-8601)"),
limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"),
api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"),
output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"),
) -> None:
"""Search for CVEs with various filters."""
async def _search() -> None:
results = []
async with NVDClient(api_key=api_key) as client:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("Searching CVEs...", total=None)
async for cve in client.cve.search_cves(
keyword_search=keyword,
cpe_name=cpe,
cvss_v2_severity=cvss_v2_severity,
cvss_v3_severity=cvss_v3_severity or severity,
cwe_id=cwe_id,
has_kev=has_kev if has_kev else None,
pub_start_date=pub_start,
pub_end_date=pub_end,
last_mod_start_date=mod_start,
last_mod_end_date=mod_end,
):
results.append(cve)
if len(results) >= limit:
break
progress.update(task, description=f"Searching CVEs... ({len(results)} found)")
if not results:
console.print("[yellow]No CVEs found matching criteria[/yellow]")
return
if output_format == "json":
format_json_lines(results)
elif output_format == "yaml":
for cve in results:
format_yaml(cve)
console.print("---")
else:
format_cve_table(results)
asyncio.run(_search())
@app.command("history")
def get_history(
cve_id: str = typer.Argument(..., help="CVE ID to get history for"),
api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"),
output_format: str = typer.Option("json", "--format", "-f", help="Output format: json, yaml"),
) -> None:
"""Get change history for a CVE."""
async def _history() -> None:
async with NVDClient(api_key=api_key) as client:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
progress.add_task(f"Fetching history for {cve_id}...", total=None)
history = await client.history.get_cve_history(cve_id)
if not history:
console.print(f"[yellow]No history found for {cve_id}[/yellow]")
return
if output_format == "yaml":
for change in history:
format_yaml(change)
console.print("---")
else:
format_json_lines(history)
asyncio.run(_history())
|