aboutsummaryrefslogtreecommitdiffstats
path: root/src/nvd/cli/commands/cve.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvd/cli/commands/cve.py')
-rw-r--r--src/nvd/cli/commands/cve.py150
1 files changed, 150 insertions, 0 deletions
diff --git a/src/nvd/cli/commands/cve.py b/src/nvd/cli/commands/cve.py
new file mode 100644
index 0000000..3f28ac7
--- /dev/null
+++ b/src/nvd/cli/commands/cve.py
@@ -0,0 +1,150 @@
+"""CVE CLI commands."""
+
+import asyncio
+import sys
+from typing import Optional
+
+import typer
+from rich.console import Console
+from rich.progress import Progress, SpinnerColumn, TextColumn
+
+from ...client import NVDClient
+from ..formatters import format_cve_table, format_json, format_json_lines, format_yaml
+
+CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]}
+
+app = typer.Typer(
+ no_args_is_help=True,
+ help="Query CVE (vulnerability) information from the NVD",
+ context_settings=CONTEXT_SETTINGS,
+ epilog="Examples:\n"
+ " nvdb cve get CVE-2021-44228\n"
+ " nvdb cve search --keyword 'sql injection' --severity HIGH\n"
+ " nvdb cve search --has-kev --limit 20\n"
+ " nvdb cve history CVE-2021-44228",
+)
+
+# Console for stderr (progress, errors, warnings)
+console = Console(stderr=True)
+
+
+@app.command("get")
+def get_cve(
+ cve_id: str = typer.Argument(..., help="CVE ID (e.g., CVE-2021-44228)"),
+ api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"),
+ output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"),
+) -> None:
+ """Get details for a specific CVE."""
+
+ async def _get() -> None:
+ async with NVDClient(api_key=api_key) as client:
+ with Progress(
+ SpinnerColumn(),
+ TextColumn("[progress.description]{task.description}"),
+ console=console,
+ ) as progress:
+ progress.add_task(f"Fetching {cve_id}...", total=None)
+ cve = await client.cve.get_cve(cve_id)
+
+ if output_format == "json":
+ format_json(cve)
+ elif output_format == "yaml":
+ format_yaml(cve)
+ else:
+ format_cve_table([cve])
+
+ asyncio.run(_get())
+
+
+@app.command("search")
+def search_cves(
+ keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in descriptions"),
+ cpe: Optional[str] = typer.Option(None, "--cpe", help="CPE name to filter by"),
+ severity: Optional[str] = typer.Option(None, "--severity", "-s", help="CVSS v3 severity: LOW, MEDIUM, HIGH, CRITICAL"),
+ cvss_v2_severity: Optional[str] = typer.Option(None, "--cvss-v2-severity", help="CVSS v2 severity"),
+ cvss_v3_severity: Optional[str] = typer.Option(None, "--cvss-v3-severity", help="CVSS v3 severity"),
+ cwe_id: Optional[str] = typer.Option(None, "--cwe", help="CWE ID (e.g., CWE-79)"),
+ has_kev: bool = typer.Option(False, "--has-kev", help="Only CVEs in CISA KEV catalog"),
+ pub_start: Optional[str] = typer.Option(None, "--pub-start", help="Publication start date (ISO-8601)"),
+ pub_end: Optional[str] = typer.Option(None, "--pub-end", help="Publication end date (ISO-8601)"),
+ mod_start: Optional[str] = typer.Option(None, "--mod-start", help="Last modified start date (ISO-8601)"),
+ mod_end: Optional[str] = typer.Option(None, "--mod-end", help="Last modified end date (ISO-8601)"),
+ limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"),
+ api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"),
+ output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"),
+) -> None:
+ """Search for CVEs with various filters."""
+
+ async def _search() -> None:
+ results = []
+ async with NVDClient(api_key=api_key) as client:
+ with Progress(
+ SpinnerColumn(),
+ TextColumn("[progress.description]{task.description}"),
+ console=console,
+ ) as progress:
+ task = progress.add_task("Searching CVEs...", total=None)
+
+ async for cve in client.cve.search_cves(
+ keyword_search=keyword,
+ cpe_name=cpe,
+ cvss_v2_severity=cvss_v2_severity,
+ cvss_v3_severity=cvss_v3_severity or severity,
+ cwe_id=cwe_id,
+ has_kev=has_kev if has_kev else None,
+ pub_start_date=pub_start,
+ pub_end_date=pub_end,
+ last_mod_start_date=mod_start,
+ last_mod_end_date=mod_end,
+ ):
+ results.append(cve)
+ if len(results) >= limit:
+ break
+ progress.update(task, description=f"Searching CVEs... ({len(results)} found)")
+
+ if not results:
+ console.print("[yellow]No CVEs found matching criteria[/yellow]")
+ return
+
+ if output_format == "json":
+ format_json_lines(results)
+ elif output_format == "yaml":
+ for cve in results:
+ format_yaml(cve)
+ console.print("---")
+ else:
+ format_cve_table(results)
+
+ asyncio.run(_search())
+
+
+@app.command("history")
+def get_history(
+ cve_id: str = typer.Argument(..., help="CVE ID to get history for"),
+ api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"),
+ output_format: str = typer.Option("json", "--format", "-f", help="Output format: json, yaml"),
+) -> None:
+ """Get change history for a CVE."""
+
+ async def _history() -> None:
+ async with NVDClient(api_key=api_key) as client:
+ with Progress(
+ SpinnerColumn(),
+ TextColumn("[progress.description]{task.description}"),
+ console=console,
+ ) as progress:
+ progress.add_task(f"Fetching history for {cve_id}...", total=None)
+ history = await client.history.get_cve_history(cve_id)
+
+ if not history:
+ console.print(f"[yellow]No history found for {cve_id}[/yellow]")
+ return
+
+ if output_format == "yaml":
+ for change in history:
+ format_yaml(change)
+ console.print("---")
+ else:
+ format_json_lines(history)
+
+ asyncio.run(_history())