1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
"""CPE CLI commands."""
import asyncio
import sys
from typing import Optional
import typer
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from ...client import NVDClient
from ..formatters import (
format_cpe_table,
format_json,
format_json_lines,
format_match_criteria_table,
format_yaml,
)
CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]}
app = typer.Typer(
no_args_is_help=True,
help="Query CPE (product) information from the NVD",
context_settings=CONTEXT_SETTINGS,
epilog="Examples:\n"
" # Keyword search (recommended for most searches)\n"
" nvdb cpe search --keyword 'apache'\n"
" nvdb cpe search --keyword 'windows 10'\n\n"
" # CPE match string (requires cpe:2.3 format)\n"
" nvdb cpe search --match-string 'cpe:2.3:a:microsoft:*'\n"
" nvdb cpe search --match-string 'cpe:2.3:o:linux:*'\n\n"
" # Get match criteria for a CVE\n"
" nvdb cpe matches --cve CVE-2021-44228",
)
# Console for stderr (progress, errors, warnings)
console = Console(stderr=True)
@app.command("get")
def get_cpe(
cpe_id: str = typer.Argument(..., help="CPE Name UUID"),
api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"),
output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"),
) -> None:
"""Get details for a specific CPE by UUID."""
async def _get() -> None:
async with NVDClient(api_key=api_key) as client:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
progress.add_task(f"Fetching CPE {cpe_id}...", total=None)
cpe = await client.cpe.get_cpe(cpe_id)
if output_format == "json":
format_json(cpe)
elif output_format == "yaml":
format_yaml(cpe)
else:
format_cpe_table([cpe])
asyncio.run(_get())
@app.command("search")
def search_cpes(
keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in CPE titles (e.g., 'windows' or 'apache')"),
match_string: Optional[str] = typer.Option(None, "--match-string", "-m", help="CPE match string in cpe:2.3 format (e.g., 'cpe:2.3:a:vendor:product:*')"),
limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"),
api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"),
output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"),
) -> None:
"""Search for CPEs.
Use --keyword for simple text search in product names.
Use --match-string for CPE formatted strings (cpe:2.3:...).
"""
async def _search() -> None:
from ...exceptions import NotFoundError
results = []
try:
async with NVDClient(api_key=api_key) as client:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("Searching CPEs...", total=None)
async for cpe in client.cpe.search_cpes(
keyword_search=keyword,
cpe_match_string=match_string,
):
results.append(cpe)
if len(results) >= limit:
break
progress.update(task, description=f"Searching CPEs... ({len(results)} found)")
if not results:
console.print("[yellow]No CPEs found matching criteria[/yellow]")
return
if output_format == "json":
format_json_lines(results)
elif output_format == "yaml":
for cpe in results:
format_yaml(cpe)
console.print("---")
else:
format_cpe_table(results)
except NotFoundError as e:
console.print(f"[red]Error:[/red] {e.message}")
if match_string:
console.print("\n[yellow]Tip:[/yellow] Use --keyword for text search, or use CPE format for --match-string")
console.print("Example: [blue]nvdb cpe search --keyword 'soft-serve'[/blue]")
console.print("Or: [blue]nvdb cpe search --match-string 'cpe:2.3:a:*:soft*'[/blue]")
raise typer.Exit(1)
asyncio.run(_search())
@app.command("matches")
def get_matches(
cve_id: Optional[str] = typer.Option(None, "--cve", help="CVE ID to get match criteria for"),
match_criteria_id: Optional[str] = typer.Option(None, "--id", help="Specific match criteria UUID"),
limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"),
api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"),
output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"),
) -> None:
"""Get CPE match criteria."""
async def _matches() -> None:
results = []
async with NVDClient(api_key=api_key) as client:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("Fetching match criteria...", total=None)
async for match in client.cpematch.search_match_criteria(
cve_id=cve_id,
match_criteria_id=match_criteria_id,
):
results.append(match)
if len(results) >= limit:
break
progress.update(task, description=f"Fetching match criteria... ({len(results)} found)")
if not results:
console.print("[yellow]No match criteria found[/yellow]")
return
if output_format == "json":
format_json_lines(results)
elif output_format == "yaml":
for match in results:
format_yaml(match)
console.print("---")
else:
format_match_criteria_table(results)
asyncio.run(_matches())
|