diff options
| author | Louis Burda <dev@sinitax.com> | 2026-01-30 03:04:01 +0100 |
|---|---|---|
| committer | Louis Burda <dev@sinitax.com> | 2026-01-30 03:04:01 +0100 |
| commit | f6487c615cff023db1574e2c23db78bf02a43709 (patch) | |
| tree | 8a0e793a8ea28b2a5eef5dcd509b6c6a2466ee1c /src/nvd/cli/commands/cve.py | |
| download | nvdb-py-main.tar.gz nvdb-py-main.zip | |
Diffstat (limited to 'src/nvd/cli/commands/cve.py')
| -rw-r--r-- | src/nvd/cli/commands/cve.py | 150 |
1 files changed, 150 insertions, 0 deletions
diff --git a/src/nvd/cli/commands/cve.py b/src/nvd/cli/commands/cve.py new file mode 100644 index 0000000..3f28ac7 --- /dev/null +++ b/src/nvd/cli/commands/cve.py @@ -0,0 +1,150 @@ +"""CVE CLI commands.""" + +import asyncio +import sys +from typing import Optional + +import typer +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn + +from ...client import NVDClient +from ..formatters import format_cve_table, format_json, format_json_lines, format_yaml + +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} + +app = typer.Typer( + no_args_is_help=True, + help="Query CVE (vulnerability) information from the NVD", + context_settings=CONTEXT_SETTINGS, + epilog="Examples:\n" + " nvdb cve get CVE-2021-44228\n" + " nvdb cve search --keyword 'sql injection' --severity HIGH\n" + " nvdb cve search --has-kev --limit 20\n" + " nvdb cve history CVE-2021-44228", +) + +# Console for stderr (progress, errors, warnings) +console = Console(stderr=True) + + +@app.command("get") +def get_cve( + cve_id: str = typer.Argument(..., help="CVE ID (e.g., CVE-2021-44228)"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Get details for a specific CVE.""" + + async def _get() -> None: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task(f"Fetching {cve_id}...", total=None) + cve = await client.cve.get_cve(cve_id) + + if output_format == "json": + format_json(cve) + elif output_format == "yaml": + format_yaml(cve) + else: + format_cve_table([cve]) + + asyncio.run(_get()) + + +@app.command("search") +def search_cves( + keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in descriptions"), + cpe: Optional[str] = typer.Option(None, "--cpe", help="CPE name to filter by"), + severity: Optional[str] = typer.Option(None, "--severity", "-s", help="CVSS v3 severity: LOW, MEDIUM, HIGH, CRITICAL"), + cvss_v2_severity: Optional[str] = typer.Option(None, "--cvss-v2-severity", help="CVSS v2 severity"), + cvss_v3_severity: Optional[str] = typer.Option(None, "--cvss-v3-severity", help="CVSS v3 severity"), + cwe_id: Optional[str] = typer.Option(None, "--cwe", help="CWE ID (e.g., CWE-79)"), + has_kev: bool = typer.Option(False, "--has-kev", help="Only CVEs in CISA KEV catalog"), + pub_start: Optional[str] = typer.Option(None, "--pub-start", help="Publication start date (ISO-8601)"), + pub_end: Optional[str] = typer.Option(None, "--pub-end", help="Publication end date (ISO-8601)"), + mod_start: Optional[str] = typer.Option(None, "--mod-start", help="Last modified start date (ISO-8601)"), + mod_end: Optional[str] = typer.Option(None, "--mod-end", help="Last modified end date (ISO-8601)"), + limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Search for CVEs with various filters.""" + + async def _search() -> None: + results = [] + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + task = progress.add_task("Searching CVEs...", total=None) + + async for cve in client.cve.search_cves( + keyword_search=keyword, + cpe_name=cpe, + cvss_v2_severity=cvss_v2_severity, + cvss_v3_severity=cvss_v3_severity or severity, + cwe_id=cwe_id, + has_kev=has_kev if has_kev else None, + pub_start_date=pub_start, + pub_end_date=pub_end, + last_mod_start_date=mod_start, + last_mod_end_date=mod_end, + ): + results.append(cve) + if len(results) >= limit: + break + progress.update(task, description=f"Searching CVEs... ({len(results)} found)") + + if not results: + console.print("[yellow]No CVEs found matching criteria[/yellow]") + return + + if output_format == "json": + format_json_lines(results) + elif output_format == "yaml": + for cve in results: + format_yaml(cve) + console.print("---") + else: + format_cve_table(results) + + asyncio.run(_search()) + + +@app.command("history") +def get_history( + cve_id: str = typer.Argument(..., help="CVE ID to get history for"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("json", "--format", "-f", help="Output format: json, yaml"), +) -> None: + """Get change history for a CVE.""" + + async def _history() -> None: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task(f"Fetching history for {cve_id}...", total=None) + history = await client.history.get_cve_history(cve_id) + + if not history: + console.print(f"[yellow]No history found for {cve_id}[/yellow]") + return + + if output_format == "yaml": + for change in history: + format_yaml(change) + console.print("---") + else: + format_json_lines(history) + + asyncio.run(_history()) |
