diff options
Diffstat (limited to 'src/nvd/cli/commands/cpe.py')
| -rw-r--r-- | src/nvd/cli/commands/cpe.py | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/src/nvd/cli/commands/cpe.py b/src/nvd/cli/commands/cpe.py new file mode 100644 index 0000000..16a6cbc --- /dev/null +++ b/src/nvd/cli/commands/cpe.py @@ -0,0 +1,170 @@ +"""CPE CLI commands.""" + +import asyncio +import sys +from typing import Optional + +import typer +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn + +from ...client import NVDClient +from ..formatters import ( + format_cpe_table, + format_json, + format_json_lines, + format_match_criteria_table, + format_yaml, +) + +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} + +app = typer.Typer( + no_args_is_help=True, + help="Query CPE (product) information from the NVD", + context_settings=CONTEXT_SETTINGS, + epilog="Examples:\n" + " # Keyword search (recommended for most searches)\n" + " nvdb cpe search --keyword 'apache'\n" + " nvdb cpe search --keyword 'windows 10'\n\n" + " # CPE match string (requires cpe:2.3 format)\n" + " nvdb cpe search --match-string 'cpe:2.3:a:microsoft:*'\n" + " nvdb cpe search --match-string 'cpe:2.3:o:linux:*'\n\n" + " # Get match criteria for a CVE\n" + " nvdb cpe matches --cve CVE-2021-44228", +) + +# Console for stderr (progress, errors, warnings) +console = Console(stderr=True) + + +@app.command("get") +def get_cpe( + cpe_id: str = typer.Argument(..., help="CPE Name UUID"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Get details for a specific CPE by UUID.""" + + async def _get() -> None: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task(f"Fetching CPE {cpe_id}...", total=None) + cpe = await client.cpe.get_cpe(cpe_id) + + if output_format == "json": + format_json(cpe) + elif output_format == "yaml": + format_yaml(cpe) + else: + format_cpe_table([cpe]) + + asyncio.run(_get()) + + +@app.command("search") +def search_cpes( + keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in CPE titles (e.g., 'windows' or 'apache')"), + match_string: Optional[str] = typer.Option(None, "--match-string", "-m", help="CPE match string in cpe:2.3 format (e.g., 'cpe:2.3:a:vendor:product:*')"), + limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Search for CPEs. + + Use --keyword for simple text search in product names. + Use --match-string for CPE formatted strings (cpe:2.3:...). + """ + + async def _search() -> None: + from ...exceptions import NotFoundError + + results = [] + try: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + task = progress.add_task("Searching CPEs...", total=None) + + async for cpe in client.cpe.search_cpes( + keyword_search=keyword, + cpe_match_string=match_string, + ): + results.append(cpe) + if len(results) >= limit: + break + progress.update(task, description=f"Searching CPEs... ({len(results)} found)") + + if not results: + console.print("[yellow]No CPEs found matching criteria[/yellow]") + return + + if output_format == "json": + format_json_lines(results) + elif output_format == "yaml": + for cpe in results: + format_yaml(cpe) + console.print("---") + else: + format_cpe_table(results) + except NotFoundError as e: + console.print(f"[red]Error:[/red] {e.message}") + if match_string: + console.print("\n[yellow]Tip:[/yellow] Use --keyword for text search, or use CPE format for --match-string") + console.print("Example: [blue]nvdb cpe search --keyword 'soft-serve'[/blue]") + console.print("Or: [blue]nvdb cpe search --match-string 'cpe:2.3:a:*:soft*'[/blue]") + raise typer.Exit(1) + + asyncio.run(_search()) + + +@app.command("matches") +def get_matches( + cve_id: Optional[str] = typer.Option(None, "--cve", help="CVE ID to get match criteria for"), + match_criteria_id: Optional[str] = typer.Option(None, "--id", help="Specific match criteria UUID"), + limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Get CPE match criteria.""" + + async def _matches() -> None: + results = [] + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + task = progress.add_task("Fetching match criteria...", total=None) + + async for match in client.cpematch.search_match_criteria( + cve_id=cve_id, + match_criteria_id=match_criteria_id, + ): + results.append(match) + if len(results) >= limit: + break + progress.update(task, description=f"Fetching match criteria... ({len(results)} found)") + + if not results: + console.print("[yellow]No match criteria found[/yellow]") + return + + if output_format == "json": + format_json_lines(results) + elif output_format == "yaml": + for match in results: + format_yaml(match) + console.print("---") + else: + format_match_criteria_table(results) + + asyncio.run(_matches()) |
