diff options
| author | Louis Burda <dev@sinitax.com> | 2026-01-30 03:04:01 +0100 |
|---|---|---|
| committer | Louis Burda <dev@sinitax.com> | 2026-01-30 03:04:01 +0100 |
| commit | f6487c615cff023db1574e2c23db78bf02a43709 (patch) | |
| tree | 8a0e793a8ea28b2a5eef5dcd509b6c6a2466ee1c /src/nvd/cli | |
| download | nvdb-py-main.tar.gz nvdb-py-main.zip | |
Diffstat (limited to 'src/nvd/cli')
| -rw-r--r-- | src/nvd/cli/__init__.py | 1 | ||||
| -rw-r--r-- | src/nvd/cli/commands/__init__.py | 1 | ||||
| -rw-r--r-- | src/nvd/cli/commands/config.py | 95 | ||||
| -rw-r--r-- | src/nvd/cli/commands/cpe.py | 170 | ||||
| -rw-r--r-- | src/nvd/cli/commands/cve.py | 150 | ||||
| -rw-r--r-- | src/nvd/cli/formatters.py | 137 | ||||
| -rw-r--r-- | src/nvd/cli/main.py | 72 |
7 files changed, 626 insertions, 0 deletions
diff --git a/src/nvd/cli/__init__.py b/src/nvd/cli/__init__.py new file mode 100644 index 0000000..73851cd --- /dev/null +++ b/src/nvd/cli/__init__.py @@ -0,0 +1 @@ +"""CLI package for NVD API.""" diff --git a/src/nvd/cli/commands/__init__.py b/src/nvd/cli/commands/__init__.py new file mode 100644 index 0000000..3f4c467 --- /dev/null +++ b/src/nvd/cli/commands/__init__.py @@ -0,0 +1 @@ +"""CLI commands package.""" diff --git a/src/nvd/cli/commands/config.py b/src/nvd/cli/commands/config.py new file mode 100644 index 0000000..46da597 --- /dev/null +++ b/src/nvd/cli/commands/config.py @@ -0,0 +1,95 @@ +"""Configuration CLI commands.""" + +import os +import sys +from pathlib import Path +from typing import Optional + +import typer +import yaml +from rich.console import Console +from rich.table import Table + +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} + +app = typer.Typer( + no_args_is_help=True, + help="Manage nvdb configuration (API keys, settings)", + context_settings=CONTEXT_SETTINGS, + epilog="Examples:\n" + " nvdb config show\n" + " nvdb config set-api-key YOUR_API_KEY\n" + " nvdb config clear", +) + +# Console for stderr (status messages, errors) +console = Console(stderr=True) + +CONFIG_DIR = Path.home() / ".config" / "nvd" +CONFIG_FILE = CONFIG_DIR / "config.yaml" + + +def load_config() -> dict: + """Load configuration from file.""" + if not CONFIG_FILE.exists(): + return {} + with open(CONFIG_FILE) as f: + return yaml.safe_load(f) or {} + + +def save_config(config: dict) -> None: + """Save configuration to file.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + with open(CONFIG_FILE, "w") as f: + yaml.dump(config, f) + + +@app.command("set-api-key") +def set_api_key( + api_key: str = typer.Argument(..., help="Your NVD API key"), +) -> None: + """Set the NVD API key in config file.""" + config = load_config() + config["api_key"] = api_key + save_config(config) + console.print(f"[green]API key saved to {CONFIG_FILE}[/green]") + console.print("[yellow]Tip: You can also set the NVD_API_KEY environment variable[/yellow]") + + +@app.command("show") +def show_config() -> None: + """Show current configuration.""" + config = load_config() + env_api_key = os.getenv("NVD_API_KEY") + + table = Table(title="NVD API Configuration", show_header=True, header_style="bold magenta") + table.add_column("Setting", style="cyan") + table.add_column("Value", style="green") + table.add_column("Source", style="yellow") + + if config.get("api_key"): + masked_key = config["api_key"][:8] + "..." if len(config["api_key"]) > 8 else "***" + table.add_row("API Key", masked_key, f"Config file ({CONFIG_FILE})") + elif env_api_key: + masked_key = env_api_key[:8] + "..." if len(env_api_key) > 8 else "***" + table.add_row("API Key", masked_key, "Environment variable") + else: + table.add_row("API Key", "Not set", "N/A") + + console.print(table) + + if not config.get("api_key") and not env_api_key: + console.print("\n[yellow]No API key configured. Using unauthenticated access (5 req/30s)[/yellow]") + console.print("[blue]Set an API key for higher rate limits (50 req/30s):[/blue]") + console.print(" nvdb config set-api-key YOUR_KEY") + console.print(" or export NVD_API_KEY=YOUR_KEY") + + +@app.command("clear") +def clear_config() -> None: + """Clear saved configuration.""" + if CONFIG_FILE.exists(): + CONFIG_FILE.unlink() + console.print("[green]Configuration cleared[/green]") + else: + console.print("[yellow]No configuration file found[/yellow]") diff --git a/src/nvd/cli/commands/cpe.py b/src/nvd/cli/commands/cpe.py new file mode 100644 index 0000000..16a6cbc --- /dev/null +++ b/src/nvd/cli/commands/cpe.py @@ -0,0 +1,170 @@ +"""CPE CLI commands.""" + +import asyncio +import sys +from typing import Optional + +import typer +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn + +from ...client import NVDClient +from ..formatters import ( + format_cpe_table, + format_json, + format_json_lines, + format_match_criteria_table, + format_yaml, +) + +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} + +app = typer.Typer( + no_args_is_help=True, + help="Query CPE (product) information from the NVD", + context_settings=CONTEXT_SETTINGS, + epilog="Examples:\n" + " # Keyword search (recommended for most searches)\n" + " nvdb cpe search --keyword 'apache'\n" + " nvdb cpe search --keyword 'windows 10'\n\n" + " # CPE match string (requires cpe:2.3 format)\n" + " nvdb cpe search --match-string 'cpe:2.3:a:microsoft:*'\n" + " nvdb cpe search --match-string 'cpe:2.3:o:linux:*'\n\n" + " # Get match criteria for a CVE\n" + " nvdb cpe matches --cve CVE-2021-44228", +) + +# Console for stderr (progress, errors, warnings) +console = Console(stderr=True) + + +@app.command("get") +def get_cpe( + cpe_id: str = typer.Argument(..., help="CPE Name UUID"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Get details for a specific CPE by UUID.""" + + async def _get() -> None: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task(f"Fetching CPE {cpe_id}...", total=None) + cpe = await client.cpe.get_cpe(cpe_id) + + if output_format == "json": + format_json(cpe) + elif output_format == "yaml": + format_yaml(cpe) + else: + format_cpe_table([cpe]) + + asyncio.run(_get()) + + +@app.command("search") +def search_cpes( + keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in CPE titles (e.g., 'windows' or 'apache')"), + match_string: Optional[str] = typer.Option(None, "--match-string", "-m", help="CPE match string in cpe:2.3 format (e.g., 'cpe:2.3:a:vendor:product:*')"), + limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Search for CPEs. + + Use --keyword for simple text search in product names. + Use --match-string for CPE formatted strings (cpe:2.3:...). + """ + + async def _search() -> None: + from ...exceptions import NotFoundError + + results = [] + try: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + task = progress.add_task("Searching CPEs...", total=None) + + async for cpe in client.cpe.search_cpes( + keyword_search=keyword, + cpe_match_string=match_string, + ): + results.append(cpe) + if len(results) >= limit: + break + progress.update(task, description=f"Searching CPEs... ({len(results)} found)") + + if not results: + console.print("[yellow]No CPEs found matching criteria[/yellow]") + return + + if output_format == "json": + format_json_lines(results) + elif output_format == "yaml": + for cpe in results: + format_yaml(cpe) + console.print("---") + else: + format_cpe_table(results) + except NotFoundError as e: + console.print(f"[red]Error:[/red] {e.message}") + if match_string: + console.print("\n[yellow]Tip:[/yellow] Use --keyword for text search, or use CPE format for --match-string") + console.print("Example: [blue]nvdb cpe search --keyword 'soft-serve'[/blue]") + console.print("Or: [blue]nvdb cpe search --match-string 'cpe:2.3:a:*:soft*'[/blue]") + raise typer.Exit(1) + + asyncio.run(_search()) + + +@app.command("matches") +def get_matches( + cve_id: Optional[str] = typer.Option(None, "--cve", help="CVE ID to get match criteria for"), + match_criteria_id: Optional[str] = typer.Option(None, "--id", help="Specific match criteria UUID"), + limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Get CPE match criteria.""" + + async def _matches() -> None: + results = [] + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + task = progress.add_task("Fetching match criteria...", total=None) + + async for match in client.cpematch.search_match_criteria( + cve_id=cve_id, + match_criteria_id=match_criteria_id, + ): + results.append(match) + if len(results) >= limit: + break + progress.update(task, description=f"Fetching match criteria... ({len(results)} found)") + + if not results: + console.print("[yellow]No match criteria found[/yellow]") + return + + if output_format == "json": + format_json_lines(results) + elif output_format == "yaml": + for match in results: + format_yaml(match) + console.print("---") + else: + format_match_criteria_table(results) + + asyncio.run(_matches()) diff --git a/src/nvd/cli/commands/cve.py b/src/nvd/cli/commands/cve.py new file mode 100644 index 0000000..3f28ac7 --- /dev/null +++ b/src/nvd/cli/commands/cve.py @@ -0,0 +1,150 @@ +"""CVE CLI commands.""" + +import asyncio +import sys +from typing import Optional + +import typer +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn + +from ...client import NVDClient +from ..formatters import format_cve_table, format_json, format_json_lines, format_yaml + +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} + +app = typer.Typer( + no_args_is_help=True, + help="Query CVE (vulnerability) information from the NVD", + context_settings=CONTEXT_SETTINGS, + epilog="Examples:\n" + " nvdb cve get CVE-2021-44228\n" + " nvdb cve search --keyword 'sql injection' --severity HIGH\n" + " nvdb cve search --has-kev --limit 20\n" + " nvdb cve history CVE-2021-44228", +) + +# Console for stderr (progress, errors, warnings) +console = Console(stderr=True) + + +@app.command("get") +def get_cve( + cve_id: str = typer.Argument(..., help="CVE ID (e.g., CVE-2021-44228)"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Get details for a specific CVE.""" + + async def _get() -> None: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task(f"Fetching {cve_id}...", total=None) + cve = await client.cve.get_cve(cve_id) + + if output_format == "json": + format_json(cve) + elif output_format == "yaml": + format_yaml(cve) + else: + format_cve_table([cve]) + + asyncio.run(_get()) + + +@app.command("search") +def search_cves( + keyword: Optional[str] = typer.Option(None, "--keyword", "-k", help="Keyword to search in descriptions"), + cpe: Optional[str] = typer.Option(None, "--cpe", help="CPE name to filter by"), + severity: Optional[str] = typer.Option(None, "--severity", "-s", help="CVSS v3 severity: LOW, MEDIUM, HIGH, CRITICAL"), + cvss_v2_severity: Optional[str] = typer.Option(None, "--cvss-v2-severity", help="CVSS v2 severity"), + cvss_v3_severity: Optional[str] = typer.Option(None, "--cvss-v3-severity", help="CVSS v3 severity"), + cwe_id: Optional[str] = typer.Option(None, "--cwe", help="CWE ID (e.g., CWE-79)"), + has_kev: bool = typer.Option(False, "--has-kev", help="Only CVEs in CISA KEV catalog"), + pub_start: Optional[str] = typer.Option(None, "--pub-start", help="Publication start date (ISO-8601)"), + pub_end: Optional[str] = typer.Option(None, "--pub-end", help="Publication end date (ISO-8601)"), + mod_start: Optional[str] = typer.Option(None, "--mod-start", help="Last modified start date (ISO-8601)"), + mod_end: Optional[str] = typer.Option(None, "--mod-end", help="Last modified end date (ISO-8601)"), + limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of results"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("table", "--format", "-f", help="Output format: table, json, yaml"), +) -> None: + """Search for CVEs with various filters.""" + + async def _search() -> None: + results = [] + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + task = progress.add_task("Searching CVEs...", total=None) + + async for cve in client.cve.search_cves( + keyword_search=keyword, + cpe_name=cpe, + cvss_v2_severity=cvss_v2_severity, + cvss_v3_severity=cvss_v3_severity or severity, + cwe_id=cwe_id, + has_kev=has_kev if has_kev else None, + pub_start_date=pub_start, + pub_end_date=pub_end, + last_mod_start_date=mod_start, + last_mod_end_date=mod_end, + ): + results.append(cve) + if len(results) >= limit: + break + progress.update(task, description=f"Searching CVEs... ({len(results)} found)") + + if not results: + console.print("[yellow]No CVEs found matching criteria[/yellow]") + return + + if output_format == "json": + format_json_lines(results) + elif output_format == "yaml": + for cve in results: + format_yaml(cve) + console.print("---") + else: + format_cve_table(results) + + asyncio.run(_search()) + + +@app.command("history") +def get_history( + cve_id: str = typer.Argument(..., help="CVE ID to get history for"), + api_key: Optional[str] = typer.Option(None, "--api-key", envvar="NVD_API_KEY", help="NVD API key"), + output_format: str = typer.Option("json", "--format", "-f", help="Output format: json, yaml"), +) -> None: + """Get change history for a CVE.""" + + async def _history() -> None: + async with NVDClient(api_key=api_key) as client: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task(f"Fetching history for {cve_id}...", total=None) + history = await client.history.get_cve_history(cve_id) + + if not history: + console.print(f"[yellow]No history found for {cve_id}[/yellow]") + return + + if output_format == "yaml": + for change in history: + format_yaml(change) + console.print("---") + else: + format_json_lines(history) + + asyncio.run(_history()) diff --git a/src/nvd/cli/formatters.py b/src/nvd/cli/formatters.py new file mode 100644 index 0000000..fa95e9e --- /dev/null +++ b/src/nvd/cli/formatters.py @@ -0,0 +1,137 @@ +"""Output formatters for CLI.""" + +import json +import sys +from typing import Any, List + +import yaml +from rich.console import Console +from rich.table import Table + +from ..models import CPEData, CPEMatchString, CVEData, SourceData + +# Console for data output - explicitly use stdout +console = Console(file=sys.stdout, stderr=False) + + +def format_json(data: Any) -> None: + """Format output as JSON.""" + if hasattr(data, "model_dump"): + output = data.model_dump(mode="json", exclude_none=True) + else: + output = data + print(json.dumps(output, indent=2, default=str, ensure_ascii=False)) + + +def format_json_lines(data: List[Any]) -> None: + """Format multiple items as JSON lines (one per line).""" + for item in data: + if hasattr(item, "model_dump"): + output = item.model_dump(mode="json", exclude_none=True) + else: + output = item + print(json.dumps(output, default=str)) + + +def format_yaml(data: Any) -> None: + """Format output as YAML.""" + if hasattr(data, "model_dump"): + output = data.model_dump(mode="json", exclude_none=True) + else: + output = data + print(yaml.dump(output, default_flow_style=False, sort_keys=False, allow_unicode=True), end="") + + +def format_cve_table(cves: List[CVEData]) -> None: + """Format CVEs as a table.""" + table = Table(title="CVE Results", show_header=True, header_style="bold magenta") + table.add_column("CVE ID", style="cyan", no_wrap=True) + table.add_column("Published", style="green") + table.add_column("CVSS v3", justify="right", style="yellow") + table.add_column("Status", style="blue") + table.add_column("Description", style="white", max_width=60) + + for cve in cves: + score = cve.cvss_v3_score or cve.cvss_v2_score + score_str = f"{score:.1f}" if score else "N/A" + + description = cve.description[:100] + "..." if len(cve.description) > 100 else cve.description + + table.add_row( + cve.id, + cve.published.strftime("%Y-%m-%d"), + score_str, + cve.vulnStatus, + description, + ) + + console.print(table) + + +def format_cpe_table(cpes: List[CPEData]) -> None: + """Format CPEs as a table.""" + table = Table(title="CPE Results", show_header=True, header_style="bold magenta") + table.add_column("CPE Name", style="cyan", no_wrap=True, max_width=50) + table.add_column("Title", style="green", max_width=40) + table.add_column("Deprecated", style="yellow") + table.add_column("Last Modified", style="blue") + + for cpe in cpes: + table.add_row( + cpe.cpeName, + cpe.title, + "Yes" if cpe.deprecated else "No", + cpe.lastModified.strftime("%Y-%m-%d"), + ) + + console.print(table) + + +def format_match_criteria_table(matches: List[CPEMatchString]) -> None: + """Format CPE match criteria as a table.""" + table = Table(title="CPE Match Criteria", show_header=True, header_style="bold magenta") + table.add_column("Criteria", style="cyan", max_width=50) + table.add_column("Status", style="green") + table.add_column("Version Range", style="yellow", max_width=30) + + for match in matches: + version_range = "" + if match.versionStartIncluding: + version_range += f">={match.versionStartIncluding} " + if match.versionStartExcluding: + version_range += f">{match.versionStartExcluding} " + if match.versionEndIncluding: + version_range += f"<={match.versionEndIncluding} " + if match.versionEndExcluding: + version_range += f"<{match.versionEndExcluding} " + + table.add_row( + match.criteria, + match.status, + version_range.strip() or "All versions", + ) + + console.print(table) + + +def format_source_table(sources: List[SourceData]) -> None: + """Format sources as a table.""" + table = Table(title="Data Sources", show_header=True, header_style="bold magenta") + table.add_column("Name", style="cyan") + table.add_column("Contact Email", style="green") + table.add_column("Identifiers", style="yellow", max_width=40) + table.add_column("Created", style="blue") + + for source in sources: + identifiers = ", ".join(source.sourceIdentifiers[:3]) + if len(source.sourceIdentifiers) > 3: + identifiers += f" (+{len(source.sourceIdentifiers) - 3} more)" + + table.add_row( + source.name, + source.contactEmail, + identifiers, + source.created.strftime("%Y-%m-%d"), + ) + + console.print(table) diff --git a/src/nvd/cli/main.py b/src/nvd/cli/main.py new file mode 100644 index 0000000..fc90fe5 --- /dev/null +++ b/src/nvd/cli/main.py @@ -0,0 +1,72 @@ +"""Main CLI entry point.""" + +import sys + +import typer +from rich.console import Console +from rich.panel import Panel +from rich.text import Text + +from .commands import config, cpe, cve + +# Enable -h as shorthand for --help +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"]} + +app = typer.Typer( + name="nvdb", + help="NVD API CLI - Query the US National Vulnerability Database", + add_completion=False, + rich_markup_mode="rich", + context_settings=CONTEXT_SETTINGS, +) + +# Console for stderr (help messages, examples) +console = Console(stderr=True) + + +def show_examples() -> None: + """Show usage examples.""" + examples = Text() + examples.append("Quick Examples:\n\n", style="bold cyan") + examples.append(" # Get a specific CVE\n", style="dim") + examples.append(" nvdb cve get CVE-2021-44228\n\n", style="green") + examples.append(" # Search for critical vulnerabilities\n", style="dim") + examples.append(" nvdb cve search --severity CRITICAL --limit 10\n\n", style="green") + examples.append(" # Search CVEs in CISA KEV catalog\n", style="dim") + examples.append(" nvdb cve search --has-kev --limit 20\n\n", style="green") + examples.append(" # Search CPEs\n", style="dim") + examples.append(" nvdb cpe search --keyword 'windows 10'\n\n", style="green") + examples.append(" # Configure API key\n", style="dim") + examples.append(" nvdb config set-api-key YOUR_API_KEY\n\n", style="green") + examples.append("Get help for specific commands:\n", style="bold yellow") + examples.append(" nvdb cve --help\n", style="blue") + examples.append(" nvdb cpe --help\n", style="blue") + examples.append(" nvdb config --help\n", style="blue") + + console.print(Panel(examples, title="[bold]nvdb - NVD API CLI[/bold]", border_style="blue")) + + +@app.callback(invoke_without_command=True) +def main_callback(ctx: typer.Context) -> None: + """Main callback to show examples when no command is provided.""" + if ctx.invoked_subcommand is None and len(sys.argv) == 1: + show_examples() + raise typer.Exit() + + +# Register command groups +app.add_typer(cve.app, name="cve", help="CVE (vulnerability) commands") +app.add_typer(cpe.app, name="cpe", help="CPE (product) commands") +app.add_typer(config.app, name="config", help="Configuration commands") + + +@app.command() +def version() -> None: + """Show version information.""" + from .. import __version__ + + console.print(f"nvdb-py version: {__version__}") + + +if __name__ == "__main__": + app() |
