"""CVE CLI commands.""" import asyncio import sys from typing import Optional import typer from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn from ...client import NVDClient from ..formatters import format_cve_table, format_json, format_json_lines, format_yaml CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} app = typer.Typer( no_args_is_help=True, help="Query CVE (vulnerability) information from the NVD", context_settings=CONTEXT_SETTINGS, epilog="Examples:\n" " nvdb cve get CVE-2021-44228\n" " nvdb cve search --keyword 'sql injection' --severity HIGH\n" " nvdb cve search --has-kev --limit 20\n" " nvdb cve history CVE-2021-44228", ) # Console for stderr (progress, errors, warnings) console = Console(stderr=True) @app.command("get") def get_cve( cve_id: str = typer.Argument(..., help="CVE ID (e.g., CVE-2021-44228)"), api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), ) -> None: """Get details for a specific CVE.""" async def _get() -> None: async with NVDClient(api_key=api_key) as client: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: progress.add_task(f"Fetching {cve_id}...", total=None) cve = await client.cve.get_cve(cve_id) if output_format == "json": format_json(cve) elif output_format == "yaml": format_yaml(cve) else: format_cve_table([cve]) asyncio.run(_get()) @app.command("search") def search_cves( keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in descriptions"), cpe: Optional[str] = typer.Option(None, "--cpe", help="CPE name to filter by"), severity: Optional[str] = typer.Option(None, "--severity", "-s", help="CVSS v3 severity: LOW, MEDIUM, HIGH, CRITICAL"), cvss_v2_severity: Optional[str] = typer.Option(None, "--cvss-v2-severity", help="CVSS v2 severity"), cvss_v3_severity: Optional[str] = typer.Option(None, "--cvss-v3-severity", help="CVSS v3 severity"), cwe_id: Optional[str] = typer.Option(None, "--cwe", help="CWE ID (e.g., CWE-79)"), has_kev: bool = typer.Option(False, "--has-kev", help="Only CVEs in CISA KEV catalog"), pub_start: Optional[str] = typer.Option(None, "--pub-start", help="Publication start date (ISO-8601)"), pub_end: Optional[str] = typer.Option(None, "--pub-end", help="Publication end date (ISO-8601)"), mod_start: Optional[str] = typer.Option(None, "--mod-start", help="Last modified start date (ISO-8601)"), mod_end: Optional[str] = typer.Option(None, "--mod-end", help="Last modified end date (ISO-8601)"), limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), ) -> None: """Search for CVEs with various filters.""" async def _search() -> None: results = [] async with NVDClient(api_key=api_key) as client: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task("Searching CVEs...", total=None) async for cve in client.cve.search_cves( keyword_search=keyword, cpe_name=cpe, cvss_v2_severity=cvss_v2_severity, cvss_v3_severity=cvss_v3_severity or severity, cwe_id=cwe_id, has_kev=has_kev if has_kev else None, pub_start_date=pub_start, pub_end_date=pub_end, last_mod_start_date=mod_start, last_mod_end_date=mod_end, ): results.append(cve) if len(results) >= limit: break progress.update(task, description=f"Searching CVEs... ({len(results)} found)") if not results: console.print("[yellow]No CVEs found matching criteria[/yellow]") return if output_format == "json": format_json_lines(results) elif output_format == "yaml": for cve in results: format_yaml(cve) console.print("---") else: format_cve_table(results) asyncio.run(_search()) @app.command("history") def get_history( cve_id: str = typer.Argument(..., help="CVE ID to get history for"), api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), output_format: str = typer.Option("json", "--format", "-f", help="Output format: json, yaml"), ) -> None: """Get change history for a CVE.""" async def _history() -> None: async with NVDClient(api_key=api_key) as client: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: progress.add_task(f"Fetching history for {cve_id}...", total=None) history = await client.history.get_cve_history(cve_id) if not history: console.print(f"[yellow]No history found for {cve_id}[/yellow]") return if output_format == "yaml": for change in history: format_yaml(change) console.print("---") else: format_json_lines(history) asyncio.run(_history())