"""CPE CLI commands.""" import asyncio import sys from typing import Optional import typer from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn from ...client import NVDClient from ..formatters import ( format_cpe_table, format_json, format_json_lines, format_match_criteria_table, format_yaml, ) CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} app = typer.Typer( no_args_is_help=True, help="Query CPE (product) information from the NVD", context_settings=CONTEXT_SETTINGS, epilog="Examples:\n" " # Keyword search (recommended for most searches)\n" " nvdb cpe search --keyword 'apache'\n" " nvdb cpe search --keyword 'windows 10'\n\n" " # CPE match string (requires cpe:2.3 format)\n" " nvdb cpe search --match-string 'cpe:2.3:a:microsoft:*'\n" " nvdb cpe search --match-string 'cpe:2.3:o:linux:*'\n\n" " # Get match criteria for a CVE\n" " nvdb cpe matches --cve CVE-2021-44228", ) # Console for stderr (progress, errors, warnings) console = Console(stderr=True) @app.command("get") def get_cpe( cpe_id: str = typer.Argument(..., help="CPE Name UUID"), api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), ) -> None: """Get details for a specific CPE by UUID.""" async def _get() -> None: async with NVDClient(api_key=api_key) as client: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: progress.add_task(f"Fetching CPE {cpe_id}...", total=None) cpe = await client.cpe.get_cpe(cpe_id) if output_format == "json": format_json(cpe) elif output_format == "yaml": format_yaml(cpe) else: format_cpe_table([cpe]) asyncio.run(_get()) @app.command("search") def search_cpes( keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in CPE titles (e.g., 'windows' or 'apache')"), match_string: Optional[str] = typer.Option(None, "--match-string", "-m", help="CPE match string in cpe:2.3 format (e.g., 'cpe:2.3:a:vendor:product:*')"), limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), ) -> None: """Search for CPEs. Use --keyword for simple text search in product names. Use --match-string for CPE formatted strings (cpe:2.3:...). """ async def _search() -> None: from ...exceptions import NotFoundError results = [] try: async with NVDClient(api_key=api_key) as client: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task("Searching CPEs...", total=None) async for cpe in client.cpe.search_cpes( keyword_search=keyword, cpe_match_string=match_string, ): results.append(cpe) if len(results) >= limit: break progress.update(task, description=f"Searching CPEs... ({len(results)} found)") if not results: console.print("[yellow]No CPEs found matching criteria[/yellow]") return if output_format == "json": format_json_lines(results) elif output_format == "yaml": for cpe in results: format_yaml(cpe) console.print("---") else: format_cpe_table(results) except NotFoundError as e: console.print(f"[red]Error:[/red] {e.message}") if match_string: console.print("\n[yellow]Tip:[/yellow] Use --keyword for text search, or use CPE format for --match-string") console.print("Example: [blue]nvdb cpe search --keyword 'soft-serve'[/blue]") console.print("Or: [blue]nvdb cpe search --match-string 'cpe:2.3:a:*:soft*'[/blue]") raise typer.Exit(1) asyncio.run(_search()) @app.command("matches") def get_matches( cve_id: Optional[str] = typer.Option(None, "--cve", help="CVE ID to get match criteria for"), match_criteria_id: Optional[str] = typer.Option(None, "--id", help="Specific match criteria UUID"), limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), ) -> None: """Get CPE match criteria.""" async def _matches() -> None: results = [] async with NVDClient(api_key=api_key) as client: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task("Fetching match criteria...", total=None) async for match in client.cpematch.search_match_criteria( cve_id=cve_id, match_criteria_id=match_criteria_id, ): results.append(match) if len(results) >= limit: break progress.update(task, description=f"Fetching match criteria... ({len(results)} found)") if not results: console.print("[yellow]No match criteria found[/yellow]") return if output_format == "json": format_json_lines(results) elif output_format == "yaml": for match in results: format_yaml(match) console.print("---") else: format_match_criteria_table(results) asyncio.run(_matches())